You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/03 02:56:58 UTC

incubator-gobblin git commit: [GOBBLIN-303] Remove empty avro files during compaction

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 6f9acc073 -> b6e88fd06


[GOBBLIN-303] Remove empty avro files during compaction

Closes #2158 from yukuai518/empty


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b6e88fd0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b6e88fd0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b6e88fd0

Branch: refs/heads/master
Commit: b6e88fd062cd855a1d4499c0a5787bbf2d1e5183
Parents: 6f9acc0
Author: Kuai Yu <ku...@linkedin.com>
Authored: Thu Nov 2 19:56:53 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Nov 2 19:56:53 2017 -0700

----------------------------------------------------------------------
 .../CompactionCompleteFileOperationAction.java  | 39 ++++++++----
 .../event/CompactionSlaEventHelper.java         |  1 +
 .../CompactionAvroJobConfigurator.java          | 66 ++++++++++++++++++++
 .../mapreduce/MRCompactorJobRunner.java         | 14 +++--
 4 files changed, 103 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b6e88fd0/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index ceddb0d..ce536a1 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -40,10 +40,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 
 /**
@@ -83,30 +86,36 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
       boolean appendDeltaOutput = this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
               MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
 
-      // Obtain record count from input file names
-      // We are not getting record count from map-reduce counter because in next run, the threshold (delta record)
-      // calculation is based on the input file names.
+      Job job = this.configurator.getConfiguredJob();
+
       long newTotalRecords = 0;
       long oldTotalRecords = helper.readRecordCount(new Path (result.getDstAbsoluteDir()));
       long executeCount = helper.readExecutionCount (new Path (result.getDstAbsoluteDir()));
+
+      List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, tmpPath, this.fs);
+
       if (appendDeltaOutput) {
         FsPermission permission = HadoopUtils.deserializeFsPermission(this.state,
                 MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
                 FsPermission.getDefault());
         WriterUtils.mkdirsWithRecursivePermission(this.fs, dstPath, permission);
         // append files under mr output to destination
-        List<Path> paths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro"));
-        for (Path path: paths) {
-          String fileName = path.getName();
-          log.info(String.format("Adding %s to %s", path.toString(), dstPath));
+        for (Path filePath: goodPaths) {
+          String fileName = filePath.getName();
+          log.info(String.format("Adding %s to %s", filePath.toString(), dstPath));
           Path outPath = new Path (dstPath, fileName);
 
-          if (!this.fs.rename(path, outPath)) {
+          if (!this.fs.rename(filePath, outPath)) {
             throw new IOException(
-                    String.format("Unable to move %s to %s", path.toString(), outPath.toString()));
+                    String.format("Unable to move %s to %s", filePath.toString(), outPath.toString()));
           }
         }
 
+        // Obtain record count from input file names.
+        // We don't get record count from map-reduce counter because in the next run, the threshold (delta record)
+        // calculation is based on the input file names. By pre-defining which input folders are involved in the
+        // MR execution, it is easy to track how many files are involved in MR so far, thus calculating the number of total records
+        // (all previous run + current run) is possible.
         newTotalRecords = this.configurator.getFileNameRecordCount();
       } else {
         this.fs.delete(dstPath, true);
@@ -120,8 +129,10 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
                   String.format("Unable to move %s to %s", tmpPath, dstPath));
         }
 
-        // get record count from map reduce job counter
-        Job job = this.configurator.getConfiguredJob();
+        // Obtain record count from map reduce job counter
+        // We don't get record count from file name because tracking which files are actually involved in the MR execution can
+        // be hard. This is due to new minutely data is rolled up to hourly folder but from daily compaction perspective we are not
+        // able to tell which file are newly added (because we simply pass all hourly folders to MR job instead of individual files).
         Counter counter = job.getCounters().findCounter(AvroKeyMapper.EVENT_COUNTER.RECORD_COUNT);
         newTotalRecords = counter.getValue();
       }
@@ -129,6 +140,7 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
       State compactState = helper.loadState(new Path (result.getDstAbsoluteDir()));
       compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
       compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
+      compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
       helper.saveState(new Path (result.getDstAbsoluteDir()), compactState);
 
       log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords, dstPath, executeCount + 1);
@@ -138,12 +150,15 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
         Map<String, String> eventMetadataMap = ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(),
             CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords),
             CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, Long.toString(oldTotalRecords),
-            CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
+            CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1),
+            CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
         this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT, eventMetadataMap);
       }
     }
   }
 
+
+
   public void addEventSubmitter(EventSubmitter eventSubmitter) {
     this.eventSubmitter = eventSubmitter;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b6e88fd0/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
index 5d89fd8..042c5a4 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
@@ -53,6 +53,7 @@ public class CompactionSlaEventHelper {
   public static final String NEED_RECOMPACT = "needRecompact";
   public static final String PREV_RECORD_COUNT_TOTAL = "prevRecordCountTotal";
   public static final String EXEC_COUNT_TOTAL = "executionCountTotal";
+  public static final String MR_JOB_ID = "mrJobId";
   public static final String RECORD_COUNT_TOTAL = "recordCountTotal";
   public static final String HIVE_REGISTRATION_PATHS = "hiveRegistrationPaths";
   public static final String RENAME_DIR_PATHS = "renameDirPaths";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b6e88fd0/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index 9c1e362..1108e74 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -19,8 +19,11 @@ package org.apache.gobblin.compaction.mapreduce;
 
 import com.google.common.base.Enums;
 import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
+
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import org.apache.gobblin.compaction.mapreduce.avro.*;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
@@ -38,6 +41,7 @@ import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.commons.math3.primes.Primes;
+import org.apache.gobblin.writer.WriterOutputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
@@ -45,13 +49,19 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * A configurator that focused on creating avro compaction map-reduce job
@@ -331,5 +341,61 @@ public class CompactionAvroJobConfigurator {
 
     return uncompacted;
   }
+
+  private static List<TaskCompletionEvent> getAllTaskCompletionEvent(Job completedJob) {
+    List<TaskCompletionEvent> completionEvents = new LinkedList<>();
+
+    while (true) {
+      try {
+        TaskCompletionEvent[] bunchOfEvents;
+        bunchOfEvents = completedJob.getTaskCompletionEvents(completionEvents.size());
+        if (bunchOfEvents == null || bunchOfEvents.length == 0) {
+          break;
+        }
+        completionEvents.addAll(Arrays.asList(bunchOfEvents));
+      } catch (IOException e) {
+        break;
+      }
+    }
+
+    return completionEvents;
+  }
+
+  private static List<TaskCompletionEvent> getUnsuccessfulTaskCompletionEvent(Job completedJob) {
+    return getAllTaskCompletionEvent(completedJob).stream().filter(te->te.getStatus() != TaskCompletionEvent.Status.SUCCEEDED).collect(
+        Collectors.toList());
+  }
+
+  private static boolean isFailedPath(Path path, List<TaskCompletionEvent> failedEvents) {
+    return failedEvents.stream()
+        .anyMatch(event -> path.toString().contains(Path.SEPARATOR + event.getTaskAttemptId().toString() + Path.SEPARATOR));
+  }
+
+  /**
+   * Remove all bad paths caused by speculative execution
+   * The problem happens when speculative task attempt initialized but then killed in the middle of processing.
+   * Some partial file was generated at {tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/part-m-xxxx.avro,
+   * without being committed to its final destination at {tmp_output}/part-m-xxxx.avro.
+   *
+   * @param job Completed MR job
+   * @param fs File system that can handle file system
+   * @return all successful paths
+   */
+  public static List<Path> removeFailedPaths(Job job, Path tmpPath, FileSystem fs) throws IOException {
+    List<TaskCompletionEvent> failedEvents = CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job);
+
+    List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro"));
+    List<Path> goodPaths = new ArrayList<>();
+    for (Path filePath: allFilePaths) {
+      if (CompactionAvroJobConfigurator.isFailedPath(filePath, failedEvents)) {
+        fs.delete(filePath, false);
+        log.error("{} is a bad path so it was deleted", filePath);
+      } else {
+        goodPaths.add(filePath);
+      }
+    }
+
+    return goodPaths;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b6e88fd0/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
index 0f3592c..32a8e0b 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.compaction.mapreduce;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -28,6 +29,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.math3.primes.Primes;
@@ -40,6 +42,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.joda.time.DateTime;
@@ -321,9 +324,12 @@ public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCom
         this.configureJob(job);
         this.submitAndWait(job);
         if (shouldPublishData(compactionTimestamp)) {
+          // remove all invalid empty files due to speculative task execution
+          List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, this.dataset.outputTmpPath(), this.tmpFs);
+
           if (!this.recompactAllData && this.recompactFromDestPaths) {
             // append new files without deleting output directory
-            addFilesInTmpPathToOutputPath();
+            addGoodFilesToOutputPath(goodPaths);
             // clean up late data from outputLateDirectory, which has been set to inputPath
             deleteFilesByPaths(this.dataset.inputPaths());
           } else {
@@ -352,7 +358,6 @@ public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCom
     }
   }
 
-
   /**
    * For regular compactions, compaction timestamp is the time the compaction job starts.
    *
@@ -603,9 +608,8 @@ public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCom
     HadoopUtils.movePath (MRCompactorJobRunner.this.tmpFs, this.dataset.outputTmpPath(), FileSystem.get(this.dataset.outputPath().getParent().toUri(), this.fs.getConf()), this.dataset.outputPath(), false, this.fs.getConf()) ;
   }
 
-  private void addFilesInTmpPathToOutputPath () throws IOException {
-    List<Path> paths = this.getApplicableFilePaths(this.dataset.outputTmpPath(), this.tmpFs);
-    for (Path path: paths) {
+  private void addGoodFilesToOutputPath (List<Path> goodPaths) throws IOException {
+    for (Path path: goodPaths) {
       String fileName = path.getName();
       LOG.info(String.format("Adding %s to %s", path.toString(), this.dataset.outputPath()));
       Path outPath = MRCompactorJobRunner.this.lateOutputRecordCountProvider.constructLateFilePath(fileName,