You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/02/14 02:38:17 UTC
[incubator-pinot] branch master updated: Add progress reporter to
SegmentCreationMapper (#3835)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3217d80 Add progress reporter to SegmentCreationMapper (#3835)
3217d80 is described below
commit 3217d805b50e247464c094c4d9736d9273c3796c
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Feb 13 18:38:11 2019 -0800
Add progress reporter to SegmentCreationMapper (#3835)
Add a reporter to report progress during segment generation to prevent job getting killed
---
.../hadoop/job/mapper/SegmentCreationMapper.java | 43 ++++++++++++++++++++++
1 file changed, 43 insertions(+)
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
index dcfab1a..bebd513 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
@@ -56,6 +56,8 @@ import org.slf4j.LoggerFactory;
public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
protected static final String LOCAL_TEMP_DIR = "pinot_hadoop_tmp";
+ protected static final String PROGRESS_REPORTER_THREAD_NAME = "pinot-hadoop-progress-reporter";
+ protected static final long PROGRESS_REPORTER_JOIN_WAIT_TIME_MS = 5_000L;
protected final Logger _logger = LoggerFactory.getLogger(getClass());
@@ -189,6 +191,11 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
_logger.info("Start creating segment with sequence id: {}", sequenceId);
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+
+ // Start a thread that reports progress every minute during segment generation to prevent job getting killed
+ Thread progressReporterThread = new Thread(getProgressReporter(context));
+ progressReporterThread.setName(PROGRESS_REPORTER_THREAD_NAME);
+ progressReporterThread.start();
try {
driver.init(segmentGeneratorConfig);
driver.build();
@@ -196,6 +203,12 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
_logger.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile,
sequenceId, e);
throw new RuntimeException(e);
+ } finally {
+ progressReporterThread.interrupt();
+ progressReporterThread.join(PROGRESS_REPORTER_JOIN_WAIT_TIME_MS);
+ if (progressReporterThread.isAlive()) {
+ _logger.error("Failed to interrupt progress reporter thread: {}", progressReporterThread);
+ }
}
String segmentName = driver.getSegmentName();
_logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);
@@ -259,6 +272,10 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
return null;
}
+ protected Runnable getProgressReporter(Context context) {
+ return new ProgressReporter(context);
+ }
+
/**
* Can be overridden to set additional segment generator configs.
*/
@@ -272,4 +289,30 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
_logger.info("Deleting local temporary directory: {}", _localStagingDir);
FileUtils.deleteQuietly(_localStagingDir);
}
+
+ private static class ProgressReporter implements Runnable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ProgressReporter.class);
+ private static final long PROGRESS_REPORTER_INTERVAL_MS = 60_000L;
+
+ private final Context _context;
+
+ ProgressReporter(Context context) {
+ _context = context;
+ }
+
+ @Override
+ public void run() {
+ LOGGER.info("Starting progress reporter thread: {}", Thread.currentThread());
+ while (true) {
+ try {
+ Thread.sleep(PROGRESS_REPORTER_INTERVAL_MS);
+ LOGGER.info("============== Reporting progress ==============");
+ _context.progress();
+ } catch (InterruptedException e) {
+ LOGGER.info("Progress reporter thread: {} interrupted", Thread.currentThread());
+ return;
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org