You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/26 22:38:27 UTC
incubator-gobblin git commit: [GOBBLIN-445] Exclude staging files
from MR speculative execution in compaction flow
Repository: incubator-gobblin
Updated Branches:
refs/heads/master a838b4d6d -> d969f937d
[GOBBLIN-445] Exclude staging files from MR speculative execution in compaction flow
Exclude staging files from MR speculative
execution in compaction flow
Add old logic back
Fix some documentation
Closes #2320 from yukuai518/zero2
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d969f937
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d969f937
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d969f937
Branch: refs/heads/master
Commit: d969f937d2c746b93b25d6be1f879a533f6601c5
Parents: a838b4d
Author: Kuai Yu <ku...@linkedin.com>
Authored: Mon Mar 26 15:38:17 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Mar 26 15:38:17 2018 -0700
----------------------------------------------------------------------
.../action/CompactionCompleteFileOperationAction.java | 2 +-
.../compaction/mapreduce/CompactionAvroJobConfigurator.java | 9 ++++-----
.../gobblin/compaction/mapreduce/MRCompactorJobRunner.java | 2 +-
.../java/org/apache/gobblin/scheduler/JobScheduler.java | 3 ++-
4 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d969f937/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index ce536a1..a21ca93 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -92,7 +92,7 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
long oldTotalRecords = helper.readRecordCount(new Path (result.getDstAbsoluteDir()));
long executeCount = helper.readExecutionCount (new Path (result.getDstAbsoluteDir()));
- List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, tmpPath, this.fs);
+ List<Path> goodPaths = CompactionAvroJobConfigurator.getGoodFiles(job, tmpPath, this.fs);
if (appendDeltaOutput) {
FsPermission permission = HadoopUtils.deserializeFsPermission(this.state,
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d969f937/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index 1108e74..b8f407f 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -41,7 +41,6 @@ import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.commons.math3.primes.Primes;
-import org.apache.gobblin.writer.WriterOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
@@ -367,21 +366,21 @@ public class CompactionAvroJobConfigurator {
}
private static boolean isFailedPath(Path path, List<TaskCompletionEvent> failedEvents) {
- return failedEvents.stream()
+ return path.toString().contains("_temporary") || failedEvents.stream()
.anyMatch(event -> path.toString().contains(Path.SEPARATOR + event.getTaskAttemptId().toString() + Path.SEPARATOR));
}
/**
- * Remove all bad paths caused by speculative execution
+ * Get good files
* The problem happens when speculative task attempt initialized but then killed in the middle of processing.
* Some partial file was generated at {tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/part-m-xxxx.avro,
* without being committed to its final destination at {tmp_output}/part-m-xxxx.avro.
*
* @param job Completed MR job
* @param fs File system that can handle file system
- * @return all successful paths
+ * @return all successful files that has been committed
*/
- public static List<Path> removeFailedPaths(Job job, Path tmpPath, FileSystem fs) throws IOException {
+ public static List<Path> getGoodFiles(Job job, Path tmpPath, FileSystem fs) throws IOException {
List<TaskCompletionEvent> failedEvents = CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job);
List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro"));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d969f937/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
index 32a8e0b..8a0599e 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
@@ -325,7 +325,7 @@ public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCom
this.submitAndWait(job);
if (shouldPublishData(compactionTimestamp)) {
// remove all invalid empty files due to speculative task execution
- List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, this.dataset.outputTmpPath(), this.tmpFs);
+ List<Path> goodPaths = CompactionAvroJobConfigurator.getGoodFiles(job, this.dataset.outputTmpPath(), this.tmpFs);
if (!this.recompactAllData && this.recompactFromDestPaths) {
// append new files without deleting output directory
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d969f937/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index f2e254d..c737431 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.gobblin.source.Source;
import org.apache.hadoop.fs.Path;
import org.quartz.CronScheduleBuilder;
@@ -457,7 +458,7 @@ public class JobScheduler extends AbstractIdleService {
* @param jobProps Job configuration properties
* @param jobListener {@link JobListener} used for callback, can be <em>null</em> if no callback is needed.
* @param jobLauncher a {@link JobLauncher} object used to launch the job to run
- * @return If current job needs retriggering
+ * @return If current job is a stop-early job based on {@link Source#isEarlyStopped()}
* @throws JobException when there is anything wrong with running the job
*/
public boolean runJob(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher)