You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/02/14 02:38:17 UTC

[incubator-pinot] branch master updated: Add progress reporter to SegmentCreationMapper (#3835)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3217d80  Add progress reporter to SegmentCreationMapper (#3835)
3217d80 is described below

commit 3217d805b50e247464c094c4d9736d9273c3796c
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Feb 13 18:38:11 2019 -0800

    Add progress reporter to SegmentCreationMapper (#3835)
    
    Add a reporter to report progress during segment generation to prevent job getting killed
---
 .../hadoop/job/mapper/SegmentCreationMapper.java   | 43 ++++++++++++++++++++++
 1 file changed, 43 insertions(+)

diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
index dcfab1a..bebd513 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
@@ -56,6 +56,8 @@ import org.slf4j.LoggerFactory;
 
 public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
   protected static final String LOCAL_TEMP_DIR = "pinot_hadoop_tmp";
+  protected static final String PROGRESS_REPORTER_THREAD_NAME = "pinot-hadoop-progress-reporter";
+  protected static final long PROGRESS_REPORTER_JOIN_WAIT_TIME_MS = 5_000L;
 
   protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
@@ -189,6 +191,11 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
 
     _logger.info("Start creating segment with sequence id: {}", sequenceId);
     SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+
+    // Start a thread that reports progress every minute during segment generation to prevent job getting killed
+    Thread progressReporterThread = new Thread(getProgressReporter(context));
+    progressReporterThread.setName(PROGRESS_REPORTER_THREAD_NAME);
+    progressReporterThread.start();
     try {
       driver.init(segmentGeneratorConfig);
       driver.build();
@@ -196,6 +203,12 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
       _logger.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile,
           sequenceId, e);
       throw new RuntimeException(e);
+    } finally {
+      progressReporterThread.interrupt();
+      progressReporterThread.join(PROGRESS_REPORTER_JOIN_WAIT_TIME_MS);
+      if (progressReporterThread.isAlive()) {
+        _logger.error("Failed to interrupt progress reporter thread: {}", progressReporterThread);
+      }
     }
     String segmentName = driver.getSegmentName();
     _logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
@@ -259,6 +272,10 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
     return null;
   }
 
+  protected Runnable getProgressReporter(Context context) {
+    return new ProgressReporter(context);
+  }
+
   /**
    * Can be overridden to set additional segment generator configs.
    */
@@ -272,4 +289,30 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
     _logger.info("Deleting local temporary directory: {}", _localStagingDir);
     FileUtils.deleteQuietly(_localStagingDir);
   }
+
+  private static class ProgressReporter implements Runnable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ProgressReporter.class);
+    private static final long PROGRESS_REPORTER_INTERVAL_MS = 60_000L;
+
+    private final Context _context;
+
+    ProgressReporter(Context context) {
+      _context = context;
+    }
+
+    @Override
+    public void run() {
+      LOGGER.info("Starting progress reporter thread: {}", Thread.currentThread());
+      while (true) {
+        try {
+          Thread.sleep(PROGRESS_REPORTER_INTERVAL_MS);
+          LOGGER.info("============== Reporting progress ==============");
+          _context.progress();
+        } catch (InterruptedException e) {
+          LOGGER.info("Progress reporter thread: {} interrupted", Thread.currentThread());
+          return;
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org