You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/05/18 23:00:02 UTC

[gobblin] branch master updated: [GOBBLIN-1649] Revert gobblin-1633 (#3510)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new deb4f4860 [GOBBLIN-1649] Revert gobblin-1633 (#3510)
deb4f4860 is described below

commit deb4f4860bf1b6a9db6db5fb608a036fdab40bce
Author: Matthew Ho <ho...@gmail.com>
AuthorDate: Wed May 18 15:59:57 2022 -0700

    [GOBBLIN-1649] Revert gobblin-1633 (#3510)
---
 .../CompactionCompleteFileOperationAction.java     |  3 +++
 .../compaction/mapreduce/MRCompactionTask.java     | 28 +---------------------
 2 files changed, 4 insertions(+), 27 deletions(-)

diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index d1fd3f796..f0f900d0c 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -32,6 +32,7 @@ import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
 import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase;
 import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.source.CompactionSource;
 import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -182,6 +183,8 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
           this.configurator.getConfiguredJob().getJobID().toString());
       compactionState.setProp(DUPLICATE_COUNT_TOTAL,
           job.getCounters().findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
+      compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
+          this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
       helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
       log.info("duplicated records count for " + dstPath + " : " + compactionState.getProp(DUPLICATE_COUNT_TOTAL));
 
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
index ce885f0cf..952e46aed 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
@@ -30,20 +30,15 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.compaction.action.CompactionCompleteAction;
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
-import org.apache.gobblin.compaction.parser.CompactionPathParser;
-import org.apache.gobblin.compaction.source.CompactionSource;
 import org.apache.gobblin.compaction.suite.CompactionSuite;
 import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
 import org.apache.gobblin.compaction.verify.CompactionVerifier;
-import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
-import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.mapreduce.MRTask;
-import org.apache.hadoop.fs.Path;
 
 
 
@@ -106,8 +101,7 @@ public class MRCompactionTask extends MRTask {
   public void onMRTaskComplete (boolean isSuccess, Throwable throwable) {
     if (isSuccess) {
       try {
-        TaskState taskState = taskContext.getTaskState();
-        setCounterInfo(taskState);
+        setCounterInfo(taskContext.getTaskState());
 
         List<CompactionCompleteAction> actions = this.suite.getCompactionCompleteActions();
         for (CompactionCompleteAction action: actions) {
@@ -115,10 +109,6 @@ public class MRCompactionTask extends MRTask {
           action.onCompactionJobComplete(dataset);
         }
         submitEvent(CompactionSlaEventHelper.COMPACTION_COMPLETED_EVENT_NAME);
-        if (dataset instanceof FileSystemDataset) {
-          commitRunStartTimeInfo(taskState, (FileSystemDataset) dataset);
-        }
-
         super.onMRTaskComplete(true, null);
       } catch (IOException e) {
         submitEvent(CompactionSlaEventHelper.COMPACTION_FAILED_EVENT_NAME);
@@ -130,22 +120,6 @@ public class MRCompactionTask extends MRTask {
     }
   }
 
-  /**
-   * Persist the run start time which is used to determine when the last successful compaction run started. This
-   * value is useful for limiting how often you recompact by verifying whether a dataset has recently been compacted.
-   * @param taskState
-   * @param dataset
-   * @throws IOException
-   */
-  private static void commitRunStartTimeInfo(TaskState taskState, FileSystemDataset dataset) throws IOException {
-    CompactionPathParser.CompactionParserResult result = new CompactionPathParser(taskState).parse(dataset);
-    InputRecordCountHelper helper = new InputRecordCountHelper(taskState);
-    State compactionState = helper.loadState(new Path(result.getDstAbsoluteDir()));
-    compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
-        taskState.getProp(CompactionSource.COMPACTION_INIT_TIME));
-    helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
-  }
-
   private void setCounterInfo(TaskState taskState)
       throws IOException {