You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/05/18 23:00:02 UTC
[gobblin] branch master updated: [GOBBLIN-1649] Revert gobblin-1633 (#3510)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new deb4f4860 [GOBBLIN-1649] Revert gobblin-1633 (#3510)
deb4f4860 is described below
commit deb4f4860bf1b6a9db6db5fb608a036fdab40bce
Author: Matthew Ho <ho...@gmail.com>
AuthorDate: Wed May 18 15:59:57 2022 -0700
[GOBBLIN-1649] Revert gobblin-1633 (#3510)
---
.../CompactionCompleteFileOperationAction.java | 3 +++
.../compaction/mapreduce/MRCompactionTask.java | 28 +---------------------
2 files changed, 4 insertions(+), 27 deletions(-)
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index d1fd3f796..f0f900d0c 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -32,6 +32,7 @@ import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase;
import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.source.CompactionSource;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -182,6 +183,8 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
this.configurator.getConfiguredJob().getJobID().toString());
compactionState.setProp(DUPLICATE_COUNT_TOTAL,
job.getCounters().findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
+ compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
+ this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
log.info("duplicated records count for " + dstPath + " : " + compactionState.getProp(DUPLICATE_COUNT_TOTAL));
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
index ce885f0cf..952e46aed 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
@@ -30,20 +30,15 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.compaction.action.CompactionCompleteAction;
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
-import org.apache.gobblin.compaction.parser.CompactionPathParser;
-import org.apache.gobblin.compaction.source.CompactionSource;
import org.apache.gobblin.compaction.suite.CompactionSuite;
import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
import org.apache.gobblin.compaction.verify.CompactionVerifier;
-import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
-import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.mapreduce.MRTask;
-import org.apache.hadoop.fs.Path;
@@ -106,8 +101,7 @@ public class MRCompactionTask extends MRTask {
public void onMRTaskComplete (boolean isSuccess, Throwable throwable) {
if (isSuccess) {
try {
- TaskState taskState = taskContext.getTaskState();
- setCounterInfo(taskState);
+ setCounterInfo(taskContext.getTaskState());
List<CompactionCompleteAction> actions = this.suite.getCompactionCompleteActions();
for (CompactionCompleteAction action: actions) {
@@ -115,10 +109,6 @@ public class MRCompactionTask extends MRTask {
action.onCompactionJobComplete(dataset);
}
submitEvent(CompactionSlaEventHelper.COMPACTION_COMPLETED_EVENT_NAME);
- if (dataset instanceof FileSystemDataset) {
- commitRunStartTimeInfo(taskState, (FileSystemDataset) dataset);
- }
-
super.onMRTaskComplete(true, null);
} catch (IOException e) {
submitEvent(CompactionSlaEventHelper.COMPACTION_FAILED_EVENT_NAME);
@@ -130,22 +120,6 @@ public class MRCompactionTask extends MRTask {
}
}
- /**
- * Persist the run start time which is used to determine when the last successful compaction run started. This
- * value is useful for limiting how often you recompact by verifying whether a dataset has recently been compacted.
- * @param taskState
- * @param dataset
- * @throws IOException
- */
- private static void commitRunStartTimeInfo(TaskState taskState, FileSystemDataset dataset) throws IOException {
- CompactionPathParser.CompactionParserResult result = new CompactionPathParser(taskState).parse(dataset);
- InputRecordCountHelper helper = new InputRecordCountHelper(taskState);
- State compactionState = helper.loadState(new Path(result.getDstAbsoluteDir()));
- compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
- taskState.getProp(CompactionSource.COMPACTION_INIT_TIME));
- helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
- }
-
private void setCounterInfo(TaskState taskState)
throws IOException {