You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/08/18 17:57:53 UTC
[incubator-gobblin] branch master updated:
[GOBBLIN-1231][GOBBLIN-1217][GOBBLIN-1223] Make re-compaction be able to
write to a new folder based on the executCount
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 099e9db [GOBBLIN-1231][GOBBLIN-1217][GOBBLIN-1223] Make re-compaction be able to write to a new folder based on the executCount
099e9db is described below
commit 099e9dbee62b1a9694f59e5b4d1ff1b6b46faa9b
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Tue Aug 18 10:57:46 2020 -0700
[GOBBLIN-1231][GOBBLIN-1217][GOBBLIN-1223] Make re-compaction be able to write to a new folder based on the executCount
Closes #3079 from ZihanLi58/GOBBLIN-1231
---
.../gobblin/configuration/ConfigurationKeys.java | 6 +++
.../CompactionCompleteFileOperationAction.java | 59 +++++++++++++---------
.../action/CompactionHiveRegistrationAction.java | 15 +++++-
.../mapreduce/AvroCompactionTaskTest.java | 36 ++++++++++++-
4 files changed, 88 insertions(+), 28 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 8524f1f..a6237cf 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -994,6 +994,12 @@ public class ConfigurationKeys {
public static final String COMPACTION_PRIORITIZER_ALIAS = COMPACTION_PRIORITIZATION_PREFIX + "prioritizerAlias";
public static final String COMPACTION_ESTIMATOR = COMPACTION_PRIORITIZATION_PREFIX + "estimator";
+ /***
+ * Configuration properties related to Re-compaction
+ */
+ public static String RECOMPACTION_WRITE_TO_NEW_FOLDER = "recompaction.write.to.new.folder";
+
+
/**
* Configuration related to ConfigStore based copy/retention
*/
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 72465b4..02ac90e 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -20,15 +20,11 @@ package org.apache.gobblin.compaction.action;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
-import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -38,11 +34,13 @@ import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.source.CompactionSource;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.WriterUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -59,6 +57,7 @@ import org.apache.hadoop.mapreduce.Job;
@AllArgsConstructor
public class CompactionCompleteFileOperationAction implements CompactionCompleteAction<FileSystemDataset> {
+ public final static String COMPACTION_DIRECTORY_FORMAT = "/compaction_%s";
protected WorkUnitState state;
private CompactionJobConfigurator configurator;
private InputRecordCountHelper helper;
@@ -97,7 +96,7 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
long newTotalRecords = 0;
long oldTotalRecords = helper.readRecordCount(new Path(result.getDstAbsoluteDir()));
- long executeCount = helper.readExecutionCount(new Path(result.getDstAbsoluteDir()));
+ long executionCount = helper.readExecutionCount(new Path(result.getDstAbsoluteDir()));
List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job, tmpPath, this.fs,
ImmutableList.of(configurator.getFileExtension()));
@@ -125,9 +124,15 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
// (all previous run + current run) is possible.
newTotalRecords = this.configurator.getFileNameRecordCount();
} else {
- this.configurator.getOldFiles()
- .add(this.fs.makeQualified(dstPath).toString());
- this.fs.delete(dstPath, true);
+ if (state.getPropAsBoolean(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, false)) {
+ Path oldFilePath = PathUtils.mergePaths(dstPath, new Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount)));
+ dstPath = PathUtils.mergePaths(dstPath, new Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount + 1)));
+ this.configurator.getOldFiles().add(this.fs.makeQualified(oldFilePath).toString());
+ //Write to a new path, no need to delete the old path
+ } else {
+ this.configurator.getOldFiles().add(this.fs.makeQualified(dstPath).toString());
+ this.fs.delete(dstPath, true);
+ }
FsPermission permission =
HadoopUtils.deserializeFsPermission(this.state, MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
FsPermission.getDefault());
@@ -144,31 +149,35 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
Counter counter = job.getCounters().findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
newTotalRecords = counter.getValue();
}
+ final Path finalDstPath = dstPath;
goodPaths.stream().forEach(p -> {
String fileName = p.getName();
- outputFiles.add(new Path(dstPath, fileName));
+ outputFiles.add(new Path(finalDstPath, fileName));
});
this.configurator.setDstNewFiles(outputFiles);
- State compactState = helper.loadState(new Path(result.getDstAbsoluteDir()));
- if(executeCount!=0) {
- compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL + Long.toString(executeCount), Long.toString(helper.readRecordCount(new Path(result.getDstAbsoluteDir()))));
- compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL + Long.toString(executeCount), Long.toString(executeCount));
- compactState.setProp("DuplicateRecordCount" + Long.toString(executeCount), compactState.getProp("DuplicateRecordCount", "null"));
+ State compactionState = helper.loadState(new Path(result.getDstAbsoluteDir()));
+ if (executionCount != 0) {
+ compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL + Long.toString(executionCount),
+ Long.toString(helper.readRecordCount(new Path(result.getDstAbsoluteDir()))));
+ compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL + Long.toString(executionCount),
+ Long.toString(executionCount));
+ compactionState.setProp("DuplicateRecordCount" + Long.toString(executionCount),
+ compactionState.getProp("DuplicateRecordCount", "null"));
}
- compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
- compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
- compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
+ compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
+ compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executionCount + 1));
+ compactionState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
this.configurator.getConfiguredJob().getJobID().toString());
- compactState.setProp("DuplicateRecordCount", job.getCounters().findCounter(
- RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
- compactState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME, this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
- helper.saveState(new Path(result.getDstAbsoluteDir()), compactState);
- log.info("duplicated records count for "+ dstPath + " : " + compactState.getProp("DuplicateRecordCount"));
-
+ compactionState.setProp("DuplicateRecordCount",
+ job.getCounters().findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
+ compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
+ this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
+ helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
+ log.info("duplicated records count for " + dstPath + " : " + compactionState.getProp("DuplicateRecordCount"));
log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords, dstPath,
- executeCount + 1);
+ executionCount + 1);
// submit events for record count
if (eventSubmitter != null) {
@@ -176,7 +185,7 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(),
CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords),
CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, Long.toString(oldTotalRecords),
- CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1),
+ CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executionCount + 1),
CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT, eventMetadataMap);
}
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
index ca32ea2..0c63107 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.util.PathUtils;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Joiner;
@@ -57,6 +59,7 @@ public class CompactionHiveRegistrationAction implements CompactionCompleteActio
private final State state;
private EventSubmitter eventSubmitter;
+ private InputRecordCountHelper helper;
public CompactionHiveRegistrationAction (State state) {
if (!(state instanceof WorkUnitState)) {
@@ -88,10 +91,18 @@ public class CompactionHiveRegistrationAction implements CompactionCompleteActio
HiveRegistrationPolicy hiveRegistrationPolicy = HiveRegistrationPolicyBase.getPolicy(state);
List<String> paths = new ArrayList<>();
- for (HiveSpec spec : hiveRegistrationPolicy.getHiveSpecs(new Path(result.getDstAbsoluteDir()))) {
+ Path dstPath = new Path(result.getDstAbsoluteDir());
+ if (state.getPropAsBoolean(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, false)) {
+ //Lazily initialize helper
+ this.helper = new InputRecordCountHelper(state);
+ long executionCount = helper.readExecutionCount(new Path(result.getDstAbsoluteDir()));
+ // Use new output path to do registration
+ dstPath = PathUtils.mergePaths(dstPath, new Path(String.format(CompactionCompleteFileOperationAction.COMPACTION_DIRECTORY_FORMAT, executionCount)));
+ }
+ for (HiveSpec spec : hiveRegistrationPolicy.getHiveSpecs(dstPath)) {
hiveRegister.register(spec);
paths.add(spec.getPath().toUri().toASCIIString());
- log.info("Hive registration is done for {}", result.getDstAbsoluteDir());
+ log.info("Hive registration is done for {}", dstPath.toString());
}
// submit events for hive registration
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 7f9ef51..145ecf1 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -173,9 +173,43 @@ public class AvroCompactionTaskTest {
Assert.assertTrue(fs.exists(new Path (basePath, "Identity/MemberAccount/hourly/2017/04/03/10")));
}
+ @Test
+ public void testAvroRecompactionWriteToNewPath() throws Exception {
+ FileSystem fs = getFileSystem();
+ String basePath = "/tmp/testRecompactionWriteToNewPath";
+ fs.delete(new Path(basePath), true);
+
+ File jobDir = new File(basePath, "Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20");
+ Assert.assertTrue(jobDir.mkdirs());
+
+ GenericRecord r1 = createRandomRecord();
+ writeFileWithContent(jobDir, "file1", r1, 20);
+
+ EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin ("Recompaction-First", basePath);
+ embeddedGobblin.setConfiguration(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, "true");
+ JobExecutionResult result = embeddedGobblin.run();
+ long recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path (basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
+ Assert.assertTrue(result.isSuccessful());
+ Assert.assertEquals(recordCount, 20);
+
+ // Now write more avro files to input dir
+ writeFileWithContent(jobDir, "file2", r1, 22);
+ EmbeddedGobblin embeddedGobblin_2 = createEmbeddedGobblin ("Recompaction-Second", basePath);
+ embeddedGobblin_2.setConfiguration(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, "true");
+ embeddedGobblin_2.run();
+ Assert.assertTrue(result.isSuccessful());
+
+ // If recompaction is succeeded, a new record count should be written.
+ recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path (basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
+ Assert.assertEquals(recordCount, 42);
+ //Assert both old output and new output exist
+ Assert.assertTrue(fs.exists(new Path (basePath, "Identity/MemberAccount/hourly/2017/04/03/10/compaction_1")));
+ Assert.assertTrue(fs.exists(new Path (basePath, "Identity/MemberAccount/hourly/2017/04/03/10/compaction_2")));
+ }
+
public void testAvroRecompactionWithLimitation() throws Exception {
FileSystem fs = getFileSystem();
- String basePath = "/tmp/testRecompaction";
+ String basePath = "/tmp/testRecompactionWithLimitation";
fs.delete(new Path(basePath), true);
File jobDir = new File(basePath, "Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20");