You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/08/18 17:57:53 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1231][GOBBLIN-1217][GOBBLIN-1223] Make re-compaction be able to write to a new folder based on the executCount

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 099e9db  [GOBBLIN-1231][GOBBLIN-1217][GOBBLIN-1223] Make re-compaction be able to write to a new folder based on the executCount
099e9db is described below

commit 099e9dbee62b1a9694f59e5b4d1ff1b6b46faa9b
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Tue Aug 18 10:57:46 2020 -0700

    [GOBBLIN-1231][GOBBLIN-1217][GOBBLIN-1223] Make re-compaction be able to write to a new folder based on the executCount
    
    Closes #3079 from ZihanLi58/GOBBLIN-1231
---
 .../gobblin/configuration/ConfigurationKeys.java   |  6 +++
 .../CompactionCompleteFileOperationAction.java     | 59 +++++++++++++---------
 .../action/CompactionHiveRegistrationAction.java   | 15 +++++-
 .../mapreduce/AvroCompactionTaskTest.java          | 36 ++++++++++++-
 4 files changed, 88 insertions(+), 28 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 8524f1f..a6237cf 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -994,6 +994,12 @@ public class ConfigurationKeys {
   public static final String COMPACTION_PRIORITIZER_ALIAS = COMPACTION_PRIORITIZATION_PREFIX + "prioritizerAlias";
   public static final String COMPACTION_ESTIMATOR = COMPACTION_PRIORITIZATION_PREFIX + "estimator";
 
+  /***
+   * Configuration properties related to Re-compaction
+   */
+  public static String RECOMPACTION_WRITE_TO_NEW_FOLDER = "recompaction.write.to.new.folder";
+
+
   /**
    * Configuration related to ConfigStore based copy/retention
    */
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 72465b4..02ac90e 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -20,15 +20,11 @@ package org.apache.gobblin.compaction.action;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -38,11 +34,13 @@ import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.compaction.source.CompactionSource;
 import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.WriterUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -59,6 +57,7 @@ import org.apache.hadoop.mapreduce.Job;
 @AllArgsConstructor
 public class CompactionCompleteFileOperationAction implements CompactionCompleteAction<FileSystemDataset> {
 
+  public final static String COMPACTION_DIRECTORY_FORMAT = "/compaction_%s";
   protected WorkUnitState state;
   private CompactionJobConfigurator configurator;
   private InputRecordCountHelper helper;
@@ -97,7 +96,7 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
 
       long newTotalRecords = 0;
       long oldTotalRecords = helper.readRecordCount(new Path(result.getDstAbsoluteDir()));
-      long executeCount = helper.readExecutionCount(new Path(result.getDstAbsoluteDir()));
+      long executionCount = helper.readExecutionCount(new Path(result.getDstAbsoluteDir()));
 
       List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job, tmpPath, this.fs,
           ImmutableList.of(configurator.getFileExtension()));
@@ -125,9 +124,15 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
         // (all previous run + current run) is possible.
         newTotalRecords = this.configurator.getFileNameRecordCount();
       } else {
-        this.configurator.getOldFiles()
-            .add(this.fs.makeQualified(dstPath).toString());
-        this.fs.delete(dstPath, true);
+        if (state.getPropAsBoolean(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, false)) {
+          Path oldFilePath = PathUtils.mergePaths(dstPath, new Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount)));
+          dstPath = PathUtils.mergePaths(dstPath, new Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount + 1)));
+          this.configurator.getOldFiles().add(this.fs.makeQualified(oldFilePath).toString());
+          //Write to a new path, no need to delete the old path
+        } else {
+          this.configurator.getOldFiles().add(this.fs.makeQualified(dstPath).toString());
+          this.fs.delete(dstPath, true);
+        }
         FsPermission permission =
             HadoopUtils.deserializeFsPermission(this.state, MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
                 FsPermission.getDefault());
@@ -144,31 +149,35 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
         Counter counter = job.getCounters().findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
         newTotalRecords = counter.getValue();
       }
+      final Path finalDstPath = dstPath;
       goodPaths.stream().forEach(p -> {
         String fileName = p.getName();
-        outputFiles.add(new Path(dstPath, fileName));
+        outputFiles.add(new Path(finalDstPath, fileName));
       });
       this.configurator.setDstNewFiles(outputFiles);
 
-      State compactState = helper.loadState(new Path(result.getDstAbsoluteDir()));
-      if(executeCount!=0) {
-        compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL + Long.toString(executeCount), Long.toString(helper.readRecordCount(new Path(result.getDstAbsoluteDir()))));
-        compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL + Long.toString(executeCount), Long.toString(executeCount));
-        compactState.setProp("DuplicateRecordCount" + Long.toString(executeCount), compactState.getProp("DuplicateRecordCount", "null"));
+      State compactionState = helper.loadState(new Path(result.getDstAbsoluteDir()));
+      if (executionCount != 0) {
+        compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL + Long.toString(executionCount),
+            Long.toString(helper.readRecordCount(new Path(result.getDstAbsoluteDir()))));
+        compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL + Long.toString(executionCount),
+            Long.toString(executionCount));
+        compactionState.setProp("DuplicateRecordCount" + Long.toString(executionCount),
+            compactionState.getProp("DuplicateRecordCount", "null"));
       }
-      compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
-      compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
-      compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
+      compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
+      compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executionCount + 1));
+      compactionState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
           this.configurator.getConfiguredJob().getJobID().toString());
-      compactState.setProp("DuplicateRecordCount", job.getCounters().findCounter(
-          RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
-      compactState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME, this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
-      helper.saveState(new Path(result.getDstAbsoluteDir()), compactState);
-      log.info("duplicated records count for "+ dstPath + " : " + compactState.getProp("DuplicateRecordCount"));
-
+      compactionState.setProp("DuplicateRecordCount",
+          job.getCounters().findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
+      compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
+          this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
+      helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
+      log.info("duplicated records count for " + dstPath + " : " + compactionState.getProp("DuplicateRecordCount"));
 
       log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords, dstPath,
-          executeCount + 1);
+          executionCount + 1);
 
       // submit events for record count
       if (eventSubmitter != null) {
@@ -176,7 +185,7 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
             ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(),
                 CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords),
                 CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, Long.toString(oldTotalRecords),
-                CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1),
+                CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executionCount + 1),
                 CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
         this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT, eventMetadataMap);
       }
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
index ca32ea2..0c63107 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.util.PathUtils;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Joiner;
@@ -57,6 +59,7 @@ public class CompactionHiveRegistrationAction implements CompactionCompleteActio
 
   private final State state;
   private EventSubmitter eventSubmitter;
+  private InputRecordCountHelper helper;
 
   public CompactionHiveRegistrationAction (State state) {
     if (!(state instanceof WorkUnitState)) {
@@ -88,10 +91,18 @@ public class CompactionHiveRegistrationAction implements CompactionCompleteActio
       HiveRegistrationPolicy hiveRegistrationPolicy = HiveRegistrationPolicyBase.getPolicy(state);
 
       List<String> paths = new ArrayList<>();
-      for (HiveSpec spec : hiveRegistrationPolicy.getHiveSpecs(new Path(result.getDstAbsoluteDir()))) {
+      Path dstPath = new Path(result.getDstAbsoluteDir());
+      if (state.getPropAsBoolean(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, false)) {
+        //Lazily initialize helper
+        this.helper = new InputRecordCountHelper(state);
+        long executionCount = helper.readExecutionCount(new Path(result.getDstAbsoluteDir()));
+        // Use new output path to do registration
+        dstPath = PathUtils.mergePaths(dstPath, new Path(String.format(CompactionCompleteFileOperationAction.COMPACTION_DIRECTORY_FORMAT, executionCount)));
+      }
+      for (HiveSpec spec : hiveRegistrationPolicy.getHiveSpecs(dstPath)) {
         hiveRegister.register(spec);
         paths.add(spec.getPath().toUri().toASCIIString());
-        log.info("Hive registration is done for {}", result.getDstAbsoluteDir());
+        log.info("Hive registration is done for {}", dstPath.toString());
       }
 
       // submit events for hive registration
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 7f9ef51..145ecf1 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -173,9 +173,43 @@ public class AvroCompactionTaskTest {
     Assert.assertTrue(fs.exists(new Path (basePath, "Identity/MemberAccount/hourly/2017/04/03/10")));
   }
 
+  @Test
+  public void testAvroRecompactionWriteToNewPath() throws Exception {
+    FileSystem fs = getFileSystem();
+    String basePath = "/tmp/testRecompactionWriteToNewPath";
+    fs.delete(new Path(basePath), true);
+
+    File jobDir = new File(basePath, "Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20");
+    Assert.assertTrue(jobDir.mkdirs());
+
+    GenericRecord r1 = createRandomRecord();
+    writeFileWithContent(jobDir, "file1", r1, 20);
+
+    EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin ("Recompaction-First", basePath);
+    embeddedGobblin.setConfiguration(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, "true");
+    JobExecutionResult result = embeddedGobblin.run();
+    long recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path (basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
+    Assert.assertTrue(result.isSuccessful());
+    Assert.assertEquals(recordCount, 20);
+
+    // Now write more avro files to input dir
+    writeFileWithContent(jobDir, "file2", r1, 22);
+    EmbeddedGobblin embeddedGobblin_2 = createEmbeddedGobblin ("Recompaction-Second", basePath);
+    embeddedGobblin_2.setConfiguration(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, "true");
+    embeddedGobblin_2.run();
+    Assert.assertTrue(result.isSuccessful());
+
+    // If recompaction is succeeded, a new record count should be written.
+    recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path (basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
+    Assert.assertEquals(recordCount, 42);
+    //Assert both old output and new output exist
+    Assert.assertTrue(fs.exists(new Path (basePath, "Identity/MemberAccount/hourly/2017/04/03/10/compaction_1")));
+    Assert.assertTrue(fs.exists(new Path (basePath, "Identity/MemberAccount/hourly/2017/04/03/10/compaction_2")));
+  }
+
   public void testAvroRecompactionWithLimitation() throws Exception {
     FileSystem fs = getFileSystem();
-    String basePath = "/tmp/testRecompaction";
+    String basePath = "/tmp/testRecompactionWithLimitation";
     fs.delete(new Path(basePath), true);
 
     File jobDir = new File(basePath, "Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20");