You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/02/05 01:57:59 UTC
[pinot] branch master updated: Improve progress reporter in SegmentCreationMapper (#8129)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8bbf93a Improve progress reporter in SegmentCreationMapper (#8129)
8bbf93a is described below
commit 8bbf93aa4377dbdf597e7940670893330452b33f
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Fri Feb 4 17:53:13 2022 -0800
Improve progress reporter in SegmentCreationMapper (#8129)
Currently, the progress reporter only runs during the segment
creation phase. We stop the reporter; however, it is possible
that the tarring & copying from local to remote location for
large file can take long. This code change makes the progress
reporter runs until the end of `map()` in SegmentCreationMapper.
---
.../hadoop/job/mappers/SegmentCreationMapper.java | 57 +++++++++++-----------
1 file changed, 29 insertions(+), 28 deletions(-)
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 4291790..e2232f6 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -275,6 +275,35 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
driver.init(segmentGeneratorConfig);
validateSchema(driver.getIngestionSchemaValidator());
driver.build();
+ String segmentName = driver.getSegmentName();
+ _logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
+
+ File localSegmentDir = new File(_localSegmentDir, segmentName);
+ String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
+ File localSegmentTarFile = new File(_localSegmentTarDir, segmentTarFileName);
+ _logger.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+ TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+
+ long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+ long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+ _logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+ DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+
+ Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
+ if (_useRelativePath) {
+ Path relativeOutputPath =
+ getRelativeOutputPath(new Path(_jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(),
+ hdfsInputFile.toUri(), _hdfsSegmentTarDir);
+ hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
+ }
+ _logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
+ FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf)
+ .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
+
+ context.write(new LongWritable(sequenceId), new Text(segmentTarFileName));
+ _logger
+ .info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
+ sequenceId);
} catch (Exception e) {
_logger.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile,
sequenceId, e);
@@ -286,34 +315,6 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
_logger.error("Failed to interrupt progress reporter thread: {}", progressReporterThread);
}
}
- String segmentName = driver.getSegmentName();
- _logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
-
- File localSegmentDir = new File(_localSegmentDir, segmentName);
- String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
- File localSegmentTarFile = new File(_localSegmentTarDir, segmentTarFileName);
- _logger.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
- TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
-
- long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
- long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
- _logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
- DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
-
- Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
- if (_useRelativePath) {
- Path relativeOutputPath =
- getRelativeOutputPath(new Path(_jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), hdfsInputFile.toUri(),
- _hdfsSegmentTarDir);
- hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
- }
- _logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
- FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf)
- .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
-
- context.write(new LongWritable(sequenceId), new Text(segmentTarFileName));
- _logger.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
- sequenceId);
}
protected FileFormat getFileFormat(String fileName) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org