You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/26 22:38:27 UTC

incubator-gobblin git commit: [GOBBLIN-445] Exclude staging files from MR speculative execution in compaction flow

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a838b4d6d -> d969f937d


[GOBBLIN-445] Exclude staging files from MR speculative execution in compaction flow

Exclude staging files from MR speculative
execution in compaction flow

Add old logic back

Fix some documentation

Closes #2320 from yukuai518/zero2


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d969f937
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d969f937
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d969f937

Branch: refs/heads/master
Commit: d969f937d2c746b93b25d6be1f879a533f6601c5
Parents: a838b4d
Author: Kuai Yu <ku...@linkedin.com>
Authored: Mon Mar 26 15:38:17 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Mar 26 15:38:17 2018 -0700

----------------------------------------------------------------------
 .../action/CompactionCompleteFileOperationAction.java       | 2 +-
 .../compaction/mapreduce/CompactionAvroJobConfigurator.java | 9 ++++-----
 .../gobblin/compaction/mapreduce/MRCompactorJobRunner.java  | 2 +-
 .../java/org/apache/gobblin/scheduler/JobScheduler.java     | 3 ++-
 4 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d969f937/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index ce536a1..a21ca93 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -92,7 +92,7 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
       long oldTotalRecords = helper.readRecordCount(new Path (result.getDstAbsoluteDir()));
       long executeCount = helper.readExecutionCount (new Path (result.getDstAbsoluteDir()));
 
-      List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, tmpPath, this.fs);
+      List<Path> goodPaths = CompactionAvroJobConfigurator.getGoodFiles(job, tmpPath, this.fs);
 
       if (appendDeltaOutput) {
         FsPermission permission = HadoopUtils.deserializeFsPermission(this.state,

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d969f937/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index 1108e74..b8f407f 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -41,7 +41,6 @@ import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.commons.math3.primes.Primes;
-import org.apache.gobblin.writer.WriterOutputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
@@ -367,21 +366,21 @@ public class CompactionAvroJobConfigurator {
   }
 
   private static boolean isFailedPath(Path path, List<TaskCompletionEvent> failedEvents) {
-    return failedEvents.stream()
+    return path.toString().contains("_temporary") || failedEvents.stream()
         .anyMatch(event -> path.toString().contains(Path.SEPARATOR + event.getTaskAttemptId().toString() + Path.SEPARATOR));
   }
 
   /**
-   * Remove all bad paths caused by speculative execution
+   * Get good files
    * The problem happens when speculative task attempt initialized but then killed in the middle of processing.
    * Some partial file was generated at {tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/part-m-xxxx.avro,
    * without being committed to its final destination at {tmp_output}/part-m-xxxx.avro.
    *
    * @param job Completed MR job
    * @param fs File system that can handle file system
-   * @return all successful paths
+   * @return all successful files that has been committed
    */
-  public static List<Path> removeFailedPaths(Job job, Path tmpPath, FileSystem fs) throws IOException {
+  public static List<Path> getGoodFiles(Job job, Path tmpPath, FileSystem fs) throws IOException {
     List<TaskCompletionEvent> failedEvents = CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job);
 
     List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro"));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d969f937/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
index 32a8e0b..8a0599e 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
@@ -325,7 +325,7 @@ public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCom
         this.submitAndWait(job);
         if (shouldPublishData(compactionTimestamp)) {
           // remove all invalid empty files due to speculative task execution
-          List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, this.dataset.outputTmpPath(), this.tmpFs);
+          List<Path> goodPaths = CompactionAvroJobConfigurator.getGoodFiles(job, this.dataset.outputTmpPath(), this.tmpFs);
 
           if (!this.recompactAllData && this.recompactFromDestPaths) {
             // append new files without deleting output directory

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d969f937/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index f2e254d..c737431 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.gobblin.source.Source;
 import org.apache.hadoop.fs.Path;
 
 import org.quartz.CronScheduleBuilder;
@@ -457,7 +458,7 @@ public class JobScheduler extends AbstractIdleService {
    * @param jobProps Job configuration properties
    * @param jobListener {@link JobListener} used for callback, can be <em>null</em> if no callback is needed.
    * @param jobLauncher a {@link JobLauncher} object used to launch the job to run
-   * @return If current job needs retriggering
+   * @return If current job is a stop-early job based on {@link Source#isEarlyStopped()}
    * @throws JobException when there is anything wrong with running the job
    */
   public boolean runJob(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher)