You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/02/05 01:57:59 UTC

[pinot] branch master updated: Improve progress reporter in SegmentCreationMapper (#8129)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bbf93a  Improve progress reporter in SegmentCreationMapper (#8129)
8bbf93a is described below

commit 8bbf93aa4377dbdf597e7940670893330452b33f
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Fri Feb 4 17:53:13 2022 -0800

    Improve progress reporter in SegmentCreationMapper (#8129)
    
    Currently, the progress reporter only runs during the segment
    creation phase. We stop the reporter; however, it is possible
    that the tarring & copying from local to remote location for
    large file can take long. This code change makes the progress
    reporter runs until the end of `map()` in SegmentCreationMapper.
---
 .../hadoop/job/mappers/SegmentCreationMapper.java  | 57 +++++++++++-----------
 1 file changed, 29 insertions(+), 28 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 4291790..e2232f6 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -275,6 +275,35 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
       driver.init(segmentGeneratorConfig);
       validateSchema(driver.getIngestionSchemaValidator());
       driver.build();
+      String segmentName = driver.getSegmentName();
+      _logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
+
+      File localSegmentDir = new File(_localSegmentDir, segmentName);
+      String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
+      File localSegmentTarFile = new File(_localSegmentTarDir, segmentTarFileName);
+      _logger.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+      TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+
+      long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+      long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+      _logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+          DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+
+      Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
+      if (_useRelativePath) {
+        Path relativeOutputPath =
+            getRelativeOutputPath(new Path(_jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(),
+                hdfsInputFile.toUri(), _hdfsSegmentTarDir);
+        hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
+      }
+      _logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
+      FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf)
+          .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
+
+      context.write(new LongWritable(sequenceId), new Text(segmentTarFileName));
+      _logger
+          .info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
+              sequenceId);
     } catch (Exception e) {
       _logger.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile,
           sequenceId, e);
@@ -286,34 +315,6 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
         _logger.error("Failed to interrupt progress reporter thread: {}", progressReporterThread);
       }
     }
-    String segmentName = driver.getSegmentName();
-    _logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
-
-    File localSegmentDir = new File(_localSegmentDir, segmentName);
-    String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
-    File localSegmentTarFile = new File(_localSegmentTarDir, segmentTarFileName);
-    _logger.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
-    TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
-
-    long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
-    long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
-    _logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
-        DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
-
-    Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
-    if (_useRelativePath) {
-      Path relativeOutputPath =
-          getRelativeOutputPath(new Path(_jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), hdfsInputFile.toUri(),
-              _hdfsSegmentTarDir);
-      hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
-    }
-    _logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
-    FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf)
-        .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
-
-    context.write(new LongWritable(sequenceId), new Text(segmentTarFileName));
-    _logger.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
-        sequenceId);
   }
 
   protected FileFormat getFileFormat(String fileName) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org