You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/03 02:56:58 UTC
incubator-gobblin git commit: [GOBBLIN-303] Remove empty avro files
during compaction
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 6f9acc073 -> b6e88fd06
[GOBBLIN-303] Remove empty avro files during compaction
Closes #2158 from yukuai518/empty
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b6e88fd0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b6e88fd0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b6e88fd0
Branch: refs/heads/master
Commit: b6e88fd062cd855a1d4499c0a5787bbf2d1e5183
Parents: 6f9acc0
Author: Kuai Yu <ku...@linkedin.com>
Authored: Thu Nov 2 19:56:53 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Nov 2 19:56:53 2017 -0700
----------------------------------------------------------------------
.../CompactionCompleteFileOperationAction.java | 39 ++++++++----
.../event/CompactionSlaEventHelper.java | 1 +
.../CompactionAvroJobConfigurator.java | 66 ++++++++++++++++++++
.../mapreduce/MRCompactorJobRunner.java | 14 +++--
4 files changed, 103 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b6e88fd0/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index ceddb0d..ce536a1 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -40,10 +40,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
@@ -83,30 +86,36 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
boolean appendDeltaOutput = this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
- // Obtain record count from input file names
- // We are not getting record count from map-reduce counter because in next run, the threshold (delta record)
- // calculation is based on the input file names.
+ Job job = this.configurator.getConfiguredJob();
+
long newTotalRecords = 0;
long oldTotalRecords = helper.readRecordCount(new Path (result.getDstAbsoluteDir()));
long executeCount = helper.readExecutionCount (new Path (result.getDstAbsoluteDir()));
+
+ List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, tmpPath, this.fs);
+
if (appendDeltaOutput) {
FsPermission permission = HadoopUtils.deserializeFsPermission(this.state,
MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
FsPermission.getDefault());
WriterUtils.mkdirsWithRecursivePermission(this.fs, dstPath, permission);
// append files under mr output to destination
- List<Path> paths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro"));
- for (Path path: paths) {
- String fileName = path.getName();
- log.info(String.format("Adding %s to %s", path.toString(), dstPath));
+ for (Path filePath: goodPaths) {
+ String fileName = filePath.getName();
+ log.info(String.format("Adding %s to %s", filePath.toString(), dstPath));
Path outPath = new Path (dstPath, fileName);
- if (!this.fs.rename(path, outPath)) {
+ if (!this.fs.rename(filePath, outPath)) {
throw new IOException(
- String.format("Unable to move %s to %s", path.toString(), outPath.toString()));
+ String.format("Unable to move %s to %s", filePath.toString(), outPath.toString()));
}
}
+ // Obtain record count from input file names.
+ // We don't get record count from map-reduce counter because in the next run, the threshold (delta record)
+ // calculation is based on the input file names. By pre-defining which input folders are involved in the
+ // MR execution, it is easy to track how many files are involved in MR so far, thus calculating the number of total records
+ // (all previous run + current run) is possible.
newTotalRecords = this.configurator.getFileNameRecordCount();
} else {
this.fs.delete(dstPath, true);
@@ -120,8 +129,10 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
String.format("Unable to move %s to %s", tmpPath, dstPath));
}
- // get record count from map reduce job counter
- Job job = this.configurator.getConfiguredJob();
+ // Obtain record count from map reduce job counter
+ // We don't get record count from file name because tracking which files are actually involved in the MR execution can
+ // be hard. This is due to new minutely data is rolled up to hourly folder but from daily compaction perspective we are not
+ // able to tell which file are newly added (because we simply pass all hourly folders to MR job instead of individual files).
Counter counter = job.getCounters().findCounter(AvroKeyMapper.EVENT_COUNTER.RECORD_COUNT);
newTotalRecords = counter.getValue();
}
@@ -129,6 +140,7 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
State compactState = helper.loadState(new Path (result.getDstAbsoluteDir()));
compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
+ compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
helper.saveState(new Path (result.getDstAbsoluteDir()), compactState);
log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords, dstPath, executeCount + 1);
@@ -138,12 +150,15 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
Map<String, String> eventMetadataMap = ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(),
CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords),
CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, Long.toString(oldTotalRecords),
- CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
+ CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1),
+ CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT, eventMetadataMap);
}
}
}
+
+
public void addEventSubmitter(EventSubmitter eventSubmitter) {
this.eventSubmitter = eventSubmitter;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b6e88fd0/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
index 5d89fd8..042c5a4 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
@@ -53,6 +53,7 @@ public class CompactionSlaEventHelper {
public static final String NEED_RECOMPACT = "needRecompact";
public static final String PREV_RECORD_COUNT_TOTAL = "prevRecordCountTotal";
public static final String EXEC_COUNT_TOTAL = "executionCountTotal";
+ public static final String MR_JOB_ID = "mrJobId";
public static final String RECORD_COUNT_TOTAL = "recordCountTotal";
public static final String HIVE_REGISTRATION_PATHS = "hiveRegistrationPaths";
public static final String RENAME_DIR_PATHS = "renameDirPaths";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b6e88fd0/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index 9c1e362..1108e74 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -19,8 +19,11 @@ package org.apache.gobblin.compaction.mapreduce;
import com.google.common.base.Enums;
import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
+
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.mapreduce.avro.*;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
@@ -38,6 +41,7 @@ import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.commons.math3.primes.Primes;
+import org.apache.gobblin.writer.WriterOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
@@ -45,13 +49,19 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* A configurator that focused on creating avro compaction map-reduce job
@@ -331,5 +341,61 @@ public class CompactionAvroJobConfigurator {
return uncompacted;
}
+
+ private static List<TaskCompletionEvent> getAllTaskCompletionEvent(Job completedJob) {
+ List<TaskCompletionEvent> completionEvents = new LinkedList<>();
+
+ while (true) {
+ try {
+ TaskCompletionEvent[] bunchOfEvents;
+ bunchOfEvents = completedJob.getTaskCompletionEvents(completionEvents.size());
+ if (bunchOfEvents == null || bunchOfEvents.length == 0) {
+ break;
+ }
+ completionEvents.addAll(Arrays.asList(bunchOfEvents));
+ } catch (IOException e) {
+ break;
+ }
+ }
+
+ return completionEvents;
+ }
+
+ private static List<TaskCompletionEvent> getUnsuccessfulTaskCompletionEvent(Job completedJob) {
+ return getAllTaskCompletionEvent(completedJob).stream().filter(te->te.getStatus() != TaskCompletionEvent.Status.SUCCEEDED).collect(
+ Collectors.toList());
+ }
+
+ private static boolean isFailedPath(Path path, List<TaskCompletionEvent> failedEvents) {
+ return failedEvents.stream()
+ .anyMatch(event -> path.toString().contains(Path.SEPARATOR + event.getTaskAttemptId().toString() + Path.SEPARATOR));
+ }
+
+ /**
+ * Remove all bad paths caused by speculative execution
+ * The problem happens when speculative task attempt initialized but then killed in the middle of processing.
+ * Some partial file was generated at {tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/part-m-xxxx.avro,
+ * without being committed to its final destination at {tmp_output}/part-m-xxxx.avro.
+ *
+ * @param job Completed MR job
+ * @param fs File system that can handle file system
+ * @return all successful paths
+ */
+ public static List<Path> removeFailedPaths(Job job, Path tmpPath, FileSystem fs) throws IOException {
+ List<TaskCompletionEvent> failedEvents = CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job);
+
+ List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro"));
+ List<Path> goodPaths = new ArrayList<>();
+ for (Path filePath: allFilePaths) {
+ if (CompactionAvroJobConfigurator.isFailedPath(filePath, failedEvents)) {
+ fs.delete(filePath, false);
+ log.error("{} is a bad path so it was deleted", filePath);
+ } else {
+ goodPaths.add(filePath);
+ }
+ }
+
+ return goodPaths;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b6e88fd0/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
index 0f3592c..32a8e0b 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.compaction.mapreduce;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -28,6 +29,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.math3.primes.Primes;
@@ -40,6 +42,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.joda.time.DateTime;
@@ -321,9 +324,12 @@ public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCom
this.configureJob(job);
this.submitAndWait(job);
if (shouldPublishData(compactionTimestamp)) {
+ // remove all invalid empty files due to speculative task execution
+ List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, this.dataset.outputTmpPath(), this.tmpFs);
+
if (!this.recompactAllData && this.recompactFromDestPaths) {
// append new files without deleting output directory
- addFilesInTmpPathToOutputPath();
+ addGoodFilesToOutputPath(goodPaths);
// clean up late data from outputLateDirectory, which has been set to inputPath
deleteFilesByPaths(this.dataset.inputPaths());
} else {
@@ -352,7 +358,6 @@ public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCom
}
}
-
/**
* For regular compactions, compaction timestamp is the time the compaction job starts.
*
@@ -603,9 +608,8 @@ public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCom
HadoopUtils.movePath (MRCompactorJobRunner.this.tmpFs, this.dataset.outputTmpPath(), FileSystem.get(this.dataset.outputPath().getParent().toUri(), this.fs.getConf()), this.dataset.outputPath(), false, this.fs.getConf()) ;
}
- private void addFilesInTmpPathToOutputPath () throws IOException {
- List<Path> paths = this.getApplicableFilePaths(this.dataset.outputTmpPath(), this.tmpFs);
- for (Path path: paths) {
+ private void addGoodFilesToOutputPath (List<Path> goodPaths) throws IOException {
+ for (Path path: goodPaths) {
String fileName = path.getName();
LOG.info(String.format("Adding %s to %s", path.toString(), this.dataset.outputPath()));
Path outPath = MRCompactorJobRunner.this.lateOutputRecordCountProvider.constructLateFilePath(fileName,