You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/07/24 01:05:42 UTC
[pinot] branch master updated: Reduce the disk usage for segment
conversion task (#7193)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2f3774b Reduce the disk usage for segment conversion task (#7193)
2f3774b is described below
commit 2f3774b09a1122cb8d0ef00e9322f7d247f07aa7
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jul 23 18:05:26 2021 -0700
Reduce the disk usage for segment conversion task (#7193)
Reduce the disk usage for segment conversion task by:
- Eliminate the file copy
- Delete the file once it's no longer needed
---
.../framework/SegmentProcessorFramework.java | 123 ++-----
.../framework/SegmentProcessorFrameworkTest.java | 386 +++++++--------------
.../BaseMultipleSegmentsConversionExecutor.java | 34 +-
.../tasks/BaseSingleSegmentConversionExecutor.java | 22 +-
.../merge_rollup/MergeRollupTaskExecutor.java | 38 +-
.../RealtimeToOfflineSegmentsTaskExecutor.java | 37 +-
.../command/SegmentProcessorFrameworkCommand.java | 53 ++-
7 files changed, 285 insertions(+), 408 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index c8df450..241ead6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -20,12 +20,12 @@ package org.apache.pinot.core.segment.processing.framework;
import com.google.common.base.Preconditions;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordReader;
@@ -35,10 +35,7 @@ import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,102 +54,48 @@ import org.slf4j.LoggerFactory;
public class SegmentProcessorFramework {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentProcessorFramework.class);
- private final File _inputSegmentsDir;
- private final File _outputSegmentsDir;
+ private final List<RecordReader> _recordReaders;
private final SegmentProcessorConfig _segmentProcessorConfig;
-
- private final TableConfig _tableConfig;
- private final Schema _schema;
-
- private final File _baseDir;
- private final File _mapperInputDir;
private final File _mapperOutputDir;
private final File _reducerOutputDir;
+ private final File _segmentsOutputDir;
/**
- * Initializes the Segment Processor framework with input segments, output path and processing config
- * @param inputSegmentsDir directory containing the input segments. These can be tarred or untarred.
- * @param segmentProcessorConfig config for segment processing
- * @param outputSegmentsDir directory for placing the resulting segments. This should already exist.
+ * Initializes the SegmentProcessorFramework with record readers, config and working directory.
*/
- public SegmentProcessorFramework(File inputSegmentsDir, SegmentProcessorConfig segmentProcessorConfig,
- File outputSegmentsDir) {
+ public SegmentProcessorFramework(List<RecordReader> recordReaders, SegmentProcessorConfig segmentProcessorConfig,
+ File workingDir)
+ throws IOException {
+ Preconditions.checkState(!recordReaders.isEmpty(), "No record reader is provided");
- LOGGER.info(
- "Initializing SegmentProcessorFramework with input segments dir: {}, output segments dir: {} and segment processor config: {}",
- inputSegmentsDir.getAbsolutePath(), outputSegmentsDir.getAbsolutePath(), segmentProcessorConfig.toString());
-
- _inputSegmentsDir = inputSegmentsDir;
- Preconditions.checkState(_inputSegmentsDir.exists() && _inputSegmentsDir.isDirectory(),
- "Input path: %s must be a directory with Pinot segments", _inputSegmentsDir.getAbsolutePath());
- _outputSegmentsDir = outputSegmentsDir;
- Preconditions.checkState(
- _outputSegmentsDir.exists() && _outputSegmentsDir.isDirectory() && (_outputSegmentsDir.list().length == 0),
- "Must provide existing empty output directory: %s", _outputSegmentsDir.getAbsolutePath());
+ LOGGER.info("Initializing SegmentProcessorFramework with {} record readers, config: {}, working dir: {}",
+ recordReaders.size(), segmentProcessorConfig, workingDir.getAbsolutePath());
+ _recordReaders = recordReaders;
_segmentProcessorConfig = segmentProcessorConfig;
- _tableConfig = segmentProcessorConfig.getTableConfig();
- _schema = segmentProcessorConfig.getSchema();
- _baseDir = new File(FileUtils.getTempDirectory(), "segment_processor_" + System.currentTimeMillis());
- FileUtils.deleteQuietly(_baseDir);
- Preconditions.checkState(_baseDir.mkdirs(), "Failed to create base directory: %s for SegmentProcessor", _baseDir);
- _mapperInputDir = new File(_baseDir, "mapper_input");
- Preconditions
- .checkState(_mapperInputDir.mkdirs(), "Failed to create mapper input directory: %s for SegmentProcessor",
- _mapperInputDir);
- _mapperOutputDir = new File(_baseDir, "mapper_output");
- Preconditions
- .checkState(_mapperOutputDir.mkdirs(), "Failed to create mapper output directory: %s for SegmentProcessor",
- _mapperOutputDir);
- _reducerOutputDir = new File(_baseDir, "reducer_output");
- Preconditions
- .checkState(_reducerOutputDir.mkdirs(), "Failed to create reducer output directory: %s for SegmentProcessor",
- _reducerOutputDir);
+ _mapperOutputDir = new File(workingDir, "mapper_output");
+ FileUtils.forceMkdir(_mapperOutputDir);
+ _reducerOutputDir = new File(workingDir, "reducer_output");
+ FileUtils.forceMkdir(_reducerOutputDir);
+ _segmentsOutputDir = new File(workingDir, "segments_output");
+ FileUtils.forceMkdir(_segmentsOutputDir);
}
/**
- * Processes segments from the input directory as per the provided configs, then puts resulting segments into the output directory
+ * Processes records from record readers per the provided config, returns the directories for the generated segments.
*/
- public void processSegments()
+ public List<File> process()
throws Exception {
- // Check for input segments
- File[] segmentFiles = _inputSegmentsDir.listFiles();
- Preconditions
- .checkState(segmentFiles != null && segmentFiles.length > 0, "Failed to find segments under input dir: %s",
- _inputSegmentsDir.getAbsolutePath());
-
// Map phase
- LOGGER.info("Beginning map phase on segments: {}", Arrays.toString(_inputSegmentsDir.list()));
- List<RecordReader> recordReaders = new ArrayList<>(segmentFiles.length);
- for (File indexDir : segmentFiles) {
- String fileName = indexDir.getName();
-
- // Untar the segments if needed
- if (!indexDir.isDirectory()) {
- if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
- indexDir = TarGzCompressionUtils.untar(indexDir, _mapperInputDir).get(0);
- } else {
- throw new IllegalStateException("Unsupported segment format: " + indexDir.getAbsolutePath());
- }
- }
-
- PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
- // NOTE: Do not fill null field with default value to be consistent with other record readers
- recordReader.init(indexDir, null, null, true);
- recordReaders.add(recordReader);
- }
- SegmentMapper mapper = new SegmentMapper(recordReaders, _segmentProcessorConfig, _mapperOutputDir);
+ LOGGER.info("Beginning map phase on {} record readers", _recordReaders.size());
+ SegmentMapper mapper = new SegmentMapper(_recordReaders, _segmentProcessorConfig, _mapperOutputDir);
Map<String, GenericRowFileManager> partitionToFileManagerMap = mapper.map();
- for (RecordReader recordReader : recordReaders) {
- recordReader.close();
- }
- FileUtils.deleteDirectory(_mapperInputDir);
// Check for mapper output files
if (partitionToFileManagerMap.isEmpty()) {
LOGGER.info("No partition generated from mapper phase, skipping the reducer phase");
- return;
+ return Collections.emptyList();
}
// Reduce phase
@@ -166,8 +109,10 @@ public class SegmentProcessorFramework {
// Segment creation phase
LOGGER.info("Beginning segment creation phase on partitions: {}", partitionToFileManagerMap.keySet());
- SegmentGeneratorConfig generatorConfig = new SegmentGeneratorConfig(_tableConfig, _schema);
- generatorConfig.setOutDir(_outputSegmentsDir.getPath());
+ List<File> outputSegmentDirs = new ArrayList<>();
+ SegmentGeneratorConfig generatorConfig =
+ new SegmentGeneratorConfig(_segmentProcessorConfig.getTableConfig(), _segmentProcessorConfig.getSchema());
+ generatorConfig.setOutDir(_segmentsOutputDir.getPath());
// TODO: Use NormalizedDateSegmentNameGenerator
generatorConfig.setSegmentNamePrefix(_segmentProcessorConfig.getSegmentConfig().getSegmentNamePrefix());
int maxNumRecordsPerSegment = _segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
@@ -192,18 +137,14 @@ public class SegmentProcessorFramework {
driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange),
passThroughTransformer, null);
driver.build();
+ outputSegmentDirs.add(driver.getOutputDirectory());
}
fileManager.cleanUp();
}
+ FileUtils.deleteDirectory(_mapperOutputDir);
+ FileUtils.deleteDirectory(_reducerOutputDir);
- LOGGER.info("Successfully converted segments from: {} to {}", Arrays.toString(_inputSegmentsDir.list()),
- Arrays.toString(_outputSegmentsDir.list()));
- }
-
- /**
- * Cleans up the Segment Processor Framework state
- */
- public void cleanup() {
- FileUtils.deleteQuietly(_baseDir);
+ LOGGER.info("Successfully created segments: {}", outputSegmentDirs);
+ return outputSegmentDirs;
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index eba34f2..d1ab033 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -20,17 +20,17 @@ package org.apache.pinot.core.segment.processing.framework;
import com.google.common.collect.Lists;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -50,20 +50,21 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
/**
* End-to-end tests for SegmentProcessorFramework
*/
public class SegmentProcessorFrameworkTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SegmentProcessorFrameworkTest");
- private File _baseDir;
- private File _emptyInputDir;
- private File _singleSegment;
- private File _multipleSegments;
- private File _multiValueSegments;
- private File _tarredSegments;
+ private List<RecordReader> _singleSegment;
+ private List<RecordReader> _multipleSegments;
+ private List<RecordReader> _multiValueSegments;
private TableConfig _tableConfig;
private TableConfig _tableConfigNullValueEnabled;
@@ -85,6 +86,9 @@ public class SegmentProcessorFrameworkTest {
@BeforeClass
public void setup()
throws Exception {
+ FileUtils.deleteQuietly(TEMP_DIR);
+ FileUtils.forceMkdir(TEMP_DIR);
+
_tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build();
_tableConfigNullValueEnabled =
new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time")
@@ -100,31 +104,14 @@ public class SegmentProcessorFrameworkTest {
.addMetric("clicks", DataType.INT, 1000)
.addDateTime("time", DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
- _baseDir = new File(FileUtils.getTempDirectory(), "segment_processor_framework_test_" + System.currentTimeMillis());
- FileUtils.deleteQuietly(_baseDir);
- assertTrue(_baseDir.mkdirs());
-
// create segments in many folders
- _emptyInputDir = new File(_baseDir, "empty_input");
- assertTrue(_emptyInputDir.mkdirs());
- _singleSegment = new File(_baseDir, "single_segment");
- createInputSegments(_singleSegment, _rawData, 1, _schema);
- _multipleSegments = new File(_baseDir, "multiple_segments");
- createInputSegments(_multipleSegments, _rawData, 3, _schema);
- _multiValueSegments = new File(_baseDir, "multi_value_segment");
- createInputSegments(_multiValueSegments, _rawDataMultiValue, 1, _schemaMV);
- _tarredSegments = new File(_baseDir, "tarred_segment");
- createInputSegments(_tarredSegments, _rawData, 3, _schema);
- File[] segmentDirs = _tarredSegments.listFiles();
- assertNotNull(segmentDirs);
- for (File segmentDir : segmentDirs) {
- TarGzCompressionUtils.createTarGzFile(segmentDir,
- new File(_tarredSegments, segmentDir.getName() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION));
- FileUtils.deleteQuietly(segmentDir);
- }
+ _singleSegment = createInputSegments(new File(TEMP_DIR, "single_segment"), _rawData, 1, _schema);
+ _multipleSegments = createInputSegments(new File(TEMP_DIR, "multiple_segments"), _rawData, 3, _schema);
+ _multiValueSegments =
+ createInputSegments(new File(TEMP_DIR, "multi_value_segment"), _rawDataMultiValue, 1, _schemaMV);
}
- private void createInputSegments(File inputDir, List<Object[]> rawData, int numSegments, Schema schema)
+ private List<RecordReader> createInputSegments(File inputDir, List<Object[]> rawData, int numSegments, Schema schema)
throws Exception {
assertTrue(inputDir.mkdirs());
@@ -145,6 +132,7 @@ public class SegmentProcessorFrameworkTest {
dataLists.add(dataList);
}
+ List<RecordReader> segmentRecordReaders = new ArrayList<>(dataLists.size());
int idx = 0;
for (List<GenericRow> inputRows : dataLists) {
RecordReader recordReader = new GenericRowRecordReader(inputRows);
@@ -155,11 +143,11 @@ public class SegmentProcessorFrameworkTest {
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(segmentGeneratorConfig, recordReader);
driver.build();
+ PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader();
+ segmentRecordReader.init(driver.getOutputDirectory(), null, null, true);
+ segmentRecordReaders.add(segmentRecordReader);
}
-
- File[] files = inputDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, numSegments);
+ return segmentRecordReaders;
}
private GenericRow getGenericRow(Object[] rawRow) {
@@ -170,100 +158,26 @@ public class SegmentProcessorFrameworkTest {
return row;
}
- @Test
- public void testBadInputFolders()
- throws Exception {
- SegmentProcessorConfig config;
-
- try {
- new SegmentProcessorConfig.Builder().setSchema(_schema).build();
- fail("Should fail for missing tableConfig");
- } catch (IllegalStateException e) {
- // expected
- }
-
- try {
- new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).build();
- fail("Should fail for missing schema");
- } catch (IllegalStateException e) {
- // expected
- }
-
- config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
-
- File outputSegmentDir = new File(_baseDir, "output_directory_bad_input_folders");
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
-
- // non-existent input dir
- File nonExistent = new File(_baseDir, "non_existent");
- try {
- new SegmentProcessorFramework(nonExistent, config, outputSegmentDir);
- fail("Should fail for non existent input dir");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // file used as input dir
- File fileInput = new File(_baseDir, "file.txt");
- assertTrue(fileInput.createNewFile());
- try {
- new SegmentProcessorFramework(fileInput, config, outputSegmentDir);
- fail("Should fail for file used as input dir");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // non existent output dir
- try {
- new SegmentProcessorFramework(_singleSegment, config, nonExistent);
- fail("Should fail for non existent output dir");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // file used as output dir
- try {
- new SegmentProcessorFramework(_singleSegment, config, fileInput);
- fail("Should fail for file used as output dir");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // output dir not empty
- try {
- new SegmentProcessorFramework(fileInput, config, _singleSegment);
- fail("Should fail for output dir not empty");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // empty input dir
- SegmentProcessorFramework framework = new SegmentProcessorFramework(_emptyInputDir, config, outputSegmentDir);
- try {
- framework.processSegments();
- fail("Should fail for empty input");
- } catch (Exception e) {
- framework.cleanup();
+ private void rewindRecordReaders(List<RecordReader> recordReaders)
+ throws IOException {
+ for (RecordReader recordReader : recordReaders) {
+ recordReader.rewind();
}
}
@Test
public void testSingleSegment()
throws Exception {
- File outputSegmentDir = new File(_baseDir, "single_segment_output");
- FileUtils.forceMkdir(outputSegmentDir);
+ File workingDir = new File(TEMP_DIR, "single_segment_output");
+ FileUtils.forceMkdir(workingDir);
// Default configs
SegmentProcessorConfig config =
new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
- SegmentProcessorFramework framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- File[] files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 1);
- ImmutableSegment segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ SegmentProcessorFramework framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+ List<File> outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 1);
+ ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
assertEquals(segmentMetadata.getTotalDocs(), 10);
ColumnMetadata campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
@@ -286,18 +200,16 @@ public class SegmentProcessorFrameworkTest {
assertNull(timeDataSource.getNullValueVector());
assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597892400000_0");
segment.destroy();
- FileUtils.cleanDirectory(outputSegmentDir);
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_singleSegment);
// Default configs - null value enabled
config =
new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schema).build();
- framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 1);
- segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 1);
+ segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
segmentMetadata = segment.getSegmentMetadata();
assertEquals(segmentMetadata.getTotalDocs(), 10);
campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
@@ -326,70 +238,62 @@ public class SegmentProcessorFrameworkTest {
assertTrue(timeNullValueVector.getNullBitmap().isEmpty());
assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597892400000_0");
segment.destroy();
- FileUtils.cleanDirectory(outputSegmentDir);
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_singleSegment);
// Time filter
config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597795200000L, 1597881600000L).build())
.build();
- framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 1);
- segmentMetadata = new SegmentMetadataImpl(files[0]);
+ framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 1);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 5);
timeMetadata = segmentMetadata.getColumnMetadataFor("time");
assertEquals(timeMetadata.getCardinality(), 5);
assertEquals(timeMetadata.getMinValue(), 1597795200000L);
assertEquals(timeMetadata.getMaxValue(), 1597878000000L);
assertEquals(segmentMetadata.getName(), "myTable_1597795200000_1597878000000_0");
- FileUtils.cleanDirectory(outputSegmentDir);
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_singleSegment);
// Time filter - filtered everything
config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597968000000L, 1598054400000L).build())
.build();
- framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 0);
- FileUtils.cleanDirectory(outputSegmentDir);
+ framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+ outputSegments = framework.process();
+ assertTrue(outputSegments.isEmpty());
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_singleSegment);
// Time round
config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema)
.setTimeHandlerConfig(new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setRoundBucketMs(86400000).build())
.build();
- framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 1);
- segmentMetadata = new SegmentMetadataImpl(files[0]);
+ framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 1);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 10);
timeMetadata = segmentMetadata.getColumnMetadataFor("time");
assertEquals(timeMetadata.getCardinality(), 3);
assertEquals(timeMetadata.getMinValue(), 1597708800000L);
assertEquals(timeMetadata.getMaxValue(), 1597881600000L);
assertEquals(segmentMetadata.getName(), "myTable_1597708800000_1597881600000_0");
- FileUtils.cleanDirectory(outputSegmentDir);
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_singleSegment);
// Time partition
config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setPartitionBucketMs(86400000).build()).build();
- framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 3);
- Arrays.sort(files);
+ framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 3);
+ outputSegments.sort(null);
// segment 0
- segmentMetadata = new SegmentMetadataImpl(files[0]);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 3);
timeMetadata = segmentMetadata.getColumnMetadataFor("time");
assertEquals(timeMetadata.getCardinality(), 3);
@@ -397,7 +301,7 @@ public class SegmentProcessorFrameworkTest {
assertEquals(timeMetadata.getMaxValue(), 1597777200000L);
assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597777200000_0");
// segment 1
- segmentMetadata = new SegmentMetadataImpl(files[1]);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(1));
assertEquals(segmentMetadata.getTotalDocs(), 5);
timeMetadata = segmentMetadata.getColumnMetadataFor("time");
assertEquals(timeMetadata.getCardinality(), 5);
@@ -405,14 +309,15 @@ public class SegmentProcessorFrameworkTest {
assertEquals(timeMetadata.getMaxValue(), 1597878000000L);
assertEquals(segmentMetadata.getName(), "myTable_1597795200000_1597878000000_1");
// segment 2
- segmentMetadata = new SegmentMetadataImpl(files[2]);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(2));
assertEquals(segmentMetadata.getTotalDocs(), 2);
timeMetadata = segmentMetadata.getColumnMetadataFor("time");
assertEquals(timeMetadata.getCardinality(), 2);
assertEquals(timeMetadata.getMinValue(), 1597881600000L);
assertEquals(timeMetadata.getMaxValue(), 1597892400000L);
assertEquals(segmentMetadata.getName(), "myTable_1597881600000_1597892400000_2");
- FileUtils.cleanDirectory(outputSegmentDir);
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_singleSegment);
// Time filter, round, partition, rollup - null value enabled
config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schema)
@@ -420,15 +325,12 @@ public class SegmentProcessorFrameworkTest {
new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597708800000L, 1597881600000L)
.setRoundBucketMs(86400000).setPartitionBucketMs(86400000).build()).setMergeType(MergeType.ROLLUP)
.build();
- framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 2);
- Arrays.sort(files);
+ framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 2);
+ outputSegments.sort(null);
// segment 0
- segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
segmentMetadata = segment.getSegmentMetadata();
assertEquals(segmentMetadata.getTotalDocs(), 2);
campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
@@ -458,7 +360,7 @@ public class SegmentProcessorFrameworkTest {
assertEquals(segmentMetadata.getName(), "myTable_1597708800000_1597708800000_0");
segment.destroy();
// segment 1
- segment = ImmutableSegmentLoader.load(files[1], ReadMode.mmap);
+ segment = ImmutableSegmentLoader.load(outputSegments.get(1), ReadMode.mmap);
segmentMetadata = segment.getSegmentMetadata();
assertEquals(segmentMetadata.getTotalDocs(), 3);
campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
@@ -487,105 +389,95 @@ public class SegmentProcessorFrameworkTest {
assertTrue(timeNullValueVector.getNullBitmap().isEmpty());
assertEquals(segmentMetadata.getName(), "myTable_1597795200000_1597795200000_1");
segment.destroy();
- FileUtils.cleanDirectory(outputSegmentDir);
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_singleSegment);
// Time round, dedup
config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema)
.setTimeHandlerConfig(new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setRoundBucketMs(86400000).build())
.setMergeType(MergeType.DEDUP).build();
- framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 1);
- segmentMetadata = new SegmentMetadataImpl(files[0]);
+ framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 1);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 8);
assertEquals(segmentMetadata.getName(), "myTable_1597708800000_1597881600000_0");
- FileUtils.cleanDirectory(outputSegmentDir);
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_singleSegment);
// Segment config
config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setSegmentConfig(
new SegmentConfig.Builder().setMaxNumRecordsPerSegment(4).setSegmentNamePrefix("myPrefix").build()).build();
- framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 3);
- Arrays.sort(files);
- segmentMetadata = new SegmentMetadataImpl(files[0]);
+ framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 3);
+ outputSegments.sort(null);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 4);
assertEquals(segmentMetadata.getName(), "myPrefix_1597719600000_1597795200000_0");
- segmentMetadata = new SegmentMetadataImpl(files[1]);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(1));
assertEquals(segmentMetadata.getTotalDocs(), 4);
assertEquals(segmentMetadata.getName(), "myPrefix_1597802400000_1597878000000_1");
- segmentMetadata = new SegmentMetadataImpl(files[2]);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(2));
assertEquals(segmentMetadata.getTotalDocs(), 2);
assertEquals(segmentMetadata.getName(), "myPrefix_1597881600000_1597892400000_2");
- FileUtils.cleanDirectory(outputSegmentDir);
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_singleSegment);
}
@Test
public void testMultipleSegments()
throws Exception {
- File outputSegmentDir = new File(_baseDir, "multiple_segments_output");
- FileUtils.forceMkdir(outputSegmentDir);
+ File workingDir = new File(TEMP_DIR, "multiple_segments_output");
+ FileUtils.forceMkdir(workingDir);
// Default configs
SegmentProcessorConfig config =
new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
- SegmentProcessorFramework framework = new SegmentProcessorFramework(_multipleSegments, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- File[] files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 1);
- SegmentMetadata segmentMetadata = new SegmentMetadataImpl(files[0]);
+ SegmentProcessorFramework framework = new SegmentProcessorFramework(_multipleSegments, config, workingDir);
+ List<File> outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 1);
+ SegmentMetadata segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 10);
assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597892400000_0");
- FileUtils.cleanDirectory(outputSegmentDir);
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_multipleSegments);
// Time round, partition, rollup
config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setRoundBucketMs(86400000).setPartitionBucketMs(86400000)
.build()).setMergeType(MergeType.ROLLUP).build();
- framework = new SegmentProcessorFramework(_multipleSegments, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 3);
- Arrays.sort(files);
- segmentMetadata = new SegmentMetadataImpl(files[0]);
+ framework = new SegmentProcessorFramework(_multipleSegments, config, workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 3);
+ outputSegments.sort(null);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 2);
assertEquals(segmentMetadata.getName(), "myTable_1597708800000_1597708800000_0");
- segmentMetadata = new SegmentMetadataImpl(files[1]);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(1));
assertEquals(segmentMetadata.getTotalDocs(), 3);
assertEquals(segmentMetadata.getName(), "myTable_1597795200000_1597795200000_1");
- segmentMetadata = new SegmentMetadataImpl(files[2]);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(2));
assertEquals(segmentMetadata.getTotalDocs(), 2);
assertEquals(segmentMetadata.getName(), "myTable_1597881600000_1597881600000_2");
- FileUtils.cleanDirectory(outputSegmentDir);
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_multipleSegments);
}
@Test
public void testMultiValue()
throws Exception {
- File outputSegmentDir = new File(_baseDir, "output_directory_multi_value");
- FileUtils.forceMkdir(outputSegmentDir);
+ File workingDir = new File(TEMP_DIR, "output_directory_multi_value");
+ FileUtils.forceMkdir(workingDir);
// Rollup - null value enabled
SegmentProcessorConfig config =
new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schemaMV)
.setMergeType(MergeType.ROLLUP).build();
- SegmentProcessorFramework framework = new SegmentProcessorFramework(_multiValueSegments, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- File[] files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 1);
- ImmutableSegment segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ SegmentProcessorFramework framework = new SegmentProcessorFramework(_multiValueSegments, config, workingDir);
+ List<File> outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 1);
+ ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
SegmentMetadataImpl segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
assertEquals(segmentMetadata.getTotalDocs(), 2);
ColumnMetadata campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
@@ -614,18 +506,16 @@ public class SegmentProcessorFrameworkTest {
assertTrue(timeNullValueVector.getNullBitmap().isEmpty());
assertEquals(segmentMetadata.getName(), "myTable_1597795200000_1597795200000_0");
segment.destroy();
- FileUtils.cleanDirectory(outputSegmentDir);
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_multiValueSegments);
// Dedup
config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schemaMV)
.setMergeType(MergeType.DEDUP).build();
- framework = new SegmentProcessorFramework(_multiValueSegments, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 1);
- segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ framework = new SegmentProcessorFramework(_multiValueSegments, config, workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 1);
+ segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
assertEquals(segmentMetadata.getTotalDocs(), 2);
campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
@@ -642,32 +532,22 @@ public class SegmentProcessorFrameworkTest {
assertEquals(timeMetadata.getMaxValue(), 1597795200000L);
assertEquals(segmentMetadata.getName(), "myTable_1597795200000_1597795200000_0");
segment.destroy();
- FileUtils.cleanDirectory(outputSegmentDir);
- }
-
- @Test
- public void testTarredSegments()
- throws Exception {
- File outputSegmentDir = new File(_baseDir, "output_directory_tarred");
- FileUtils.forceMkdir(outputSegmentDir);
-
- // Default configs
- SegmentProcessorConfig config =
- new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
- SegmentProcessorFramework framework = new SegmentProcessorFramework(_tarredSegments, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- File[] files = outputSegmentDir.listFiles();
- assertNotNull(files);
- assertEquals(files.length, 1);
- SegmentMetadata segmentMetadata = new SegmentMetadataImpl(files[0]);
- assertEquals(segmentMetadata.getTotalDocs(), 10);
- assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597892400000_0");
- FileUtils.cleanDirectory(outputSegmentDir);
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_multiValueSegments);
}
@AfterClass
- public void tearDown() {
- FileUtils.deleteQuietly(_baseDir);
+ public void tearDown()
+ throws IOException {
+ for (RecordReader recordReader : _singleSegment) {
+ recordReader.close();
+ }
+ for (RecordReader recordReader : _multipleSegments) {
+ recordReader.close();
+ }
+ for (RecordReader recordReader : _multiValueSegments) {
+ recordReader.close();
+ }
+ FileUtils.deleteQuietly(TEMP_DIR);
}
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 4382bd7..2105623 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -61,12 +61,12 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
* Converts the segment based on the given {@link PinotTaskConfig}.
*
* @param pinotTaskConfig Task config
- * @param originalIndexDir Index directory for the original segment
+ * @param segmentDirs Index directories for the original segments
* @param workingDir Working directory for the converted segment
* @return a list of segment conversion result
* @throws Exception
*/
- protected abstract List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDir,
+ protected abstract List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> segmentDirs,
File workingDir)
throws Exception;
@@ -106,7 +106,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
String crypterName = getTableConfig(tableNameWithType).getValidationConfig().getCrypterClassName();
try {
- List<File> inputSegmentFiles = new ArrayList<>();
+ List<File> inputSegmentDirs = new ArrayList<>();
for (int i = 0; i < downloadURLs.length; i++) {
// Download the segment file
File tarredSegmentFile = new File(tempDataDir, "tarredSegmentFile_" + i);
@@ -116,13 +116,23 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
// Un-tar the segment file
File segmentDir = new File(tempDataDir, "segmentDir_" + i);
File indexDir = TarGzCompressionUtils.untar(tarredSegmentFile, segmentDir).get(0);
- inputSegmentFiles.add(indexDir);
+ inputSegmentDirs.add(indexDir);
+ if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
+ LOGGER.warn("Failed to delete tarred input segment: {}", tarredSegmentFile.getAbsolutePath());
+ }
}
// Convert the segments
File workingDir = new File(tempDataDir, "workingDir");
Preconditions.checkState(workingDir.mkdir());
- List<SegmentConversionResult> segmentConversionResults = convert(pinotTaskConfig, inputSegmentFiles, workingDir);
+ List<SegmentConversionResult> segmentConversionResults = convert(pinotTaskConfig, inputSegmentDirs, workingDir);
+
+ // Delete the input segments
+ for (File inputSegmentDir : inputSegmentDirs) {
+ if (!FileUtils.deleteQuietly(inputSegmentDir)) {
+ LOGGER.warn("Failed to delete input segment: {}", inputSegmentDir.getAbsolutePath());
+ }
+ }
// Create a directory for converted tarred segment files
File convertedTarredSegmentDir = new File(tempDataDir, "convertedTarredSegmentDir");
@@ -132,11 +142,14 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
List<File> tarredSegmentFiles = new ArrayList<>(numOutputSegments);
for (SegmentConversionResult segmentConversionResult : segmentConversionResults) {
// Tar the converted segment
- File convertedIndexDir = segmentConversionResult.getFile();
+ File convertedSegmentDir = segmentConversionResult.getFile();
File convertedSegmentTarFile = new File(convertedTarredSegmentDir,
segmentConversionResult.getSegmentName() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(convertedIndexDir, convertedSegmentTarFile);
+ TarGzCompressionUtils.createTarGzFile(convertedSegmentDir, convertedSegmentTarFile);
tarredSegmentFiles.add(convertedSegmentTarFile);
+ if (!FileUtils.deleteQuietly(convertedSegmentDir)) {
+ LOGGER.warn("Failed to delete converted segment: {}", convertedSegmentDir.getAbsolutePath());
+ }
}
// Check whether the task get cancelled before uploading the segment
@@ -153,8 +166,8 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
List<String> segmentsTo =
segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
- lineageEntryId = SegmentConversionUtils
- .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ lineageEntryId = SegmentConversionUtils.startSegmentReplace(tableNameWithType, uploadURL,
+ new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
}
// Upload the tarred segments
@@ -184,6 +197,9 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
SegmentConversionUtils
.uploadSegment(configs, FileUploadDownloadClient.makeAuthHeader(authToken), parameters, tableNameWithType,
resultSegmentName, uploadURL, convertedTarredSegmentFile);
+ if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
+ LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath());
+ }
}
// Update the segment lineage to indicate that the segment replacement is done.
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index eae3e6e..381ed58 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -94,6 +94,9 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut
// Un-tar the segment file
File segmentDir = new File(tempDataDir, "segmentDir");
File indexDir = TarGzCompressionUtils.untar(tarredSegmentFile, segmentDir).get(0);
+ if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
+ LOGGER.warn("Failed to delete tarred input segment: {}", tarredSegmentFile.getAbsolutePath());
+ }
// Convert the segment
File workingDir = new File(tempDataDir, "workingDir");
@@ -103,9 +106,19 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut
"Converted segment name: %s does not match original segment name: %s",
segmentConversionResult.getSegmentName(), segmentName);
+ // Delete the input segment
+ if (!FileUtils.deleteQuietly(indexDir)) {
+ LOGGER.warn("Failed to delete input segment: {}", indexDir.getAbsolutePath());
+ }
+
// Tar the converted segment
- File convertedSegmentTarFile = new File(tempDataDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- TarGzCompressionUtils.createTarGzFile(segmentConversionResult.getFile(), convertedSegmentTarFile);
+ File convertedSegmentDir = segmentConversionResult.getFile();
+ File convertedTarredSegmentFile =
+ new File(tempDataDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ TarGzCompressionUtils.createTarGzFile(convertedSegmentDir, convertedTarredSegmentFile);
+ if (!FileUtils.deleteQuietly(convertedSegmentDir)) {
+ LOGGER.warn("Failed to delete converted segment: {}", convertedSegmentDir.getAbsolutePath());
+ }
// Check whether the task get cancelled before uploading the segment
if (_cancelled) {
@@ -141,7 +154,10 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut
// Upload the tarred segment
SegmentConversionUtils.uploadSegment(configs, httpHeaders, parameters, tableNameWithType, segmentName, uploadURL,
- convertedSegmentTarFile);
+ convertedTarredSegmentFile);
+ if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
+ LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath());
+ }
LOGGER.info("Done executing {} on table: {}, segment: {}", taskType, tableNameWithType, segmentName);
return segmentConversionResult;
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
index c1e7587..86d6af4 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
@@ -18,13 +18,11 @@
*/
package org.apache.pinot.plugin.minion.tasks.merge_rollup;
-import com.google.common.base.Preconditions;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.MergeRollupTask;
@@ -34,8 +32,10 @@ import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramew
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,11 +45,9 @@ import org.slf4j.LoggerFactory;
*/
public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
- private static final String INPUT_SEGMENTS_DIR = "input_segments";
- private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
@Override
- protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
+ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> segmentDirs,
File workingDir)
throws Exception {
String taskType = pinotTaskConfig.getTaskType();
@@ -83,30 +81,28 @@ public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecu
SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
- File inputSegmentsDir = new File(workingDir, INPUT_SEGMENTS_DIR);
- Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create input directory: %s for task: %s",
- inputSegmentsDir.getAbsolutePath(), taskType);
- for (File indexDir : originalIndexDirs) {
- FileUtils.copyDirectoryToDirectory(indexDir, inputSegmentsDir);
+ List<RecordReader> recordReaders = new ArrayList<>(segmentDirs.size());
+ for (File segmentDir : segmentDirs) {
+ PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
+ // NOTE: Do not fill null field with default value to be consistent with other record readers
+ recordReader.init(segmentDir, null, null, true);
+ recordReaders.add(recordReader);
}
- File outputSegmentsDir = new File(workingDir, OUTPUT_SEGMENTS_DIR);
- Preconditions.checkState(outputSegmentsDir.mkdirs(), "Failed to create output directory: %s for task: %s",
- outputSegmentsDir.getAbsolutePath(), taskType);
-
- SegmentProcessorFramework segmentProcessorFramework =
- new SegmentProcessorFramework(inputSegmentsDir, segmentProcessorConfig, outputSegmentsDir);
+ List<File> outputSegmentDirs;
try {
- segmentProcessorFramework.processSegments();
+ outputSegmentDirs = new SegmentProcessorFramework(recordReaders, segmentProcessorConfig, workingDir).process();
} finally {
- segmentProcessorFramework.cleanup();
+ for (RecordReader recordReader : recordReaders) {
+ recordReader.close();
+ }
}
long endMillis = System.currentTimeMillis();
LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));
List<SegmentConversionResult> results = new ArrayList<>();
- for (File file : outputSegmentsDir.listFiles()) {
- String outputSegmentName = file.getName();
- results.add(new SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
+ for (File outputSegmentDir : outputSegmentDirs) {
+ String outputSegmentName = outputSegmentDir.getName();
+ results.add(new SegmentConversionResult.Builder().setFile(outputSegmentDir).setSegmentName(outputSegmentName)
.setTableNameWithType(tableNameWithType).build());
}
return results;
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
index 2a24937..aefd917 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.commons.io.FileUtils;
import org.apache.helix.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
@@ -38,8 +37,10 @@ import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,8 +68,6 @@ import org.slf4j.LoggerFactory;
*/
public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskExecutor.class);
- private static final String INPUT_SEGMENTS_DIR = "input_segments";
- private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
private int _expectedVersion = Integer.MIN_VALUE;
@@ -105,7 +104,7 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
}
@Override
- protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
+ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> segmentDirs,
File workingDir)
throws Exception {
String taskType = pinotTaskConfig.getTaskType();
@@ -149,30 +148,28 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
- File inputSegmentsDir = new File(workingDir, INPUT_SEGMENTS_DIR);
- Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create input directory: %s for task: %s",
- inputSegmentsDir.getAbsolutePath(), taskType);
- for (File indexDir : originalIndexDirs) {
- FileUtils.copyDirectoryToDirectory(indexDir, inputSegmentsDir);
+ List<RecordReader> recordReaders = new ArrayList<>(segmentDirs.size());
+ for (File segmentDir : segmentDirs) {
+ PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
+ // NOTE: Do not fill null field with default value to be consistent with other record readers
+ recordReader.init(segmentDir, null, null, true);
+ recordReaders.add(recordReader);
}
- File outputSegmentsDir = new File(workingDir, OUTPUT_SEGMENTS_DIR);
- Preconditions.checkState(outputSegmentsDir.mkdirs(), "Failed to create output directory: %s for task: %s",
- outputSegmentsDir.getAbsolutePath(), taskType);
-
- SegmentProcessorFramework segmentProcessorFramework =
- new SegmentProcessorFramework(inputSegmentsDir, segmentProcessorConfig, outputSegmentsDir);
+ List<File> outputSegmentDirs;
try {
- segmentProcessorFramework.processSegments();
+ outputSegmentDirs = new SegmentProcessorFramework(recordReaders, segmentProcessorConfig, workingDir).process();
} finally {
- segmentProcessorFramework.cleanup();
+ for (RecordReader recordReader : recordReaders) {
+ recordReader.close();
+ }
}
long endMillis = System.currentTimeMillis();
LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));
List<SegmentConversionResult> results = new ArrayList<>();
- for (File file : outputSegmentsDir.listFiles()) {
- String outputSegmentName = file.getName();
- results.add(new SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
+ for (File outputSegmentDir : outputSegmentDirs) {
+ String outputSegmentName = outputSegmentDir.getName();
+ results.add(new SegmentConversionResult.Builder().setFile(outputSegmentDir).setSegmentName(outputSegmentName)
.setTableNameWithType(offlineTableName).build());
}
return results;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java
index 75af4a1..81ef424 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java
@@ -18,11 +18,19 @@
*/
package org.apache.pinot.tools.admin.command;
+import com.google.common.base.Preconditions;
import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.Command;
@@ -72,25 +80,42 @@ public class SegmentProcessorFrameworkCommand extends AbstractBaseAdminCommand i
@Override
public boolean execute()
throws Exception {
+ PluginManager.get().init();
SegmentProcessorFrameworkSpec segmentProcessorFrameworkSpec =
JsonUtils.fileToObject(new File(_segmentProcessorFrameworkSpec), SegmentProcessorFrameworkSpec.class);
File inputSegmentsDir = new File(segmentProcessorFrameworkSpec.getInputSegmentsDir());
File outputSegmentsDir = new File(segmentProcessorFrameworkSpec.getOutputSegmentsDir());
- if (!outputSegmentsDir.exists()) {
- if (!outputSegmentsDir.mkdirs()) {
- throw new RuntimeException(
- "Did not find output directory, and could not create it either: " + segmentProcessorFrameworkSpec
- .getOutputSegmentsDir());
+ File workingDir = new File(outputSegmentsDir, "tmp-" + UUID.randomUUID());
+ File untarredSegmentsDir = new File(workingDir, "untarred_segments");
+ FileUtils.forceMkdir(untarredSegmentsDir);
+ File[] segmentDirs = inputSegmentsDir.listFiles();
+ Preconditions
+ .checkState(segmentDirs != null && segmentDirs.length > 0, "Failed to find files under input segments dir: %s",
+ inputSegmentsDir.getAbsolutePath());
+ List<RecordReader> recordReaders = new ArrayList<>(segmentDirs.length);
+ for (File segmentDir : segmentDirs) {
+ String fileName = segmentDir.getName();
+
+ // Untar the segments if needed
+ if (!segmentDir.isDirectory()) {
+ if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
+ segmentDir = TarGzCompressionUtils.untar(segmentDir, untarredSegmentsDir).get(0);
+ } else {
+ throw new IllegalStateException("Unsupported segment format: " + segmentDir.getAbsolutePath());
+ }
}
- }
- PluginManager.get().init();
+ PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
+ // NOTE: Do not fill null field with default value to be consistent with other record readers
+ recordReader.init(segmentDir, null, null, true);
+ recordReaders.add(recordReader);
+ }
- Schema schema = Schema.fromFile(new File(segmentProcessorFrameworkSpec.getSchemaFile()));
TableConfig tableConfig =
JsonUtils.fileToObject(new File(segmentProcessorFrameworkSpec.getTableConfigFile()), TableConfig.class);
+ Schema schema = Schema.fromFile(new File(segmentProcessorFrameworkSpec.getSchemaFile()));
SegmentProcessorConfig segmentProcessorConfig =
new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
.setTimeHandlerConfig(segmentProcessorFrameworkSpec.getTimeHandlerConfig())
@@ -100,16 +125,22 @@ public class SegmentProcessorFrameworkCommand extends AbstractBaseAdminCommand i
.setSegmentConfig(segmentProcessorFrameworkSpec.getSegmentConfig()).build();
SegmentProcessorFramework framework =
- new SegmentProcessorFramework(inputSegmentsDir, segmentProcessorConfig, outputSegmentsDir);
+ new SegmentProcessorFramework(recordReaders, segmentProcessorConfig, workingDir);
try {
LOGGER.info("Starting processing segments via SegmentProcessingFramework");
- framework.processSegments();
+ List<File> outputSegmentDirs = framework.process();
+ for (File outputSegmentDir : outputSegmentDirs) {
+ FileUtils.moveDirectory(outputSegmentDir, new File(outputSegmentsDir, outputSegmentDir.getName()));
+ }
LOGGER.info("Finished processing segments via SegmentProcessingFramework");
} catch (Exception e) {
LOGGER.error("Caught exception when running SegmentProcessingFramework. Exiting", e);
return false;
} finally {
- framework.cleanup();
+ for (RecordReader recordReader : recordReaders) {
+ recordReader.close();
+ }
+ FileUtils.deleteQuietly(workingDir);
}
return true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org