You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/08/17 23:18:28 UTC

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3079: [GOBBLIN-1231]Make re-compaction be able to write to a new folder based on the executCount

sv2000 commented on a change in pull request #3079:
URL: https://github.com/apache/incubator-gobblin/pull/3079#discussion_r471820430



##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
##########
@@ -144,28 +147,32 @@ public void onCompactionJobComplete(FileSystemDataset dataset) throws IOExceptio
         Counter counter = job.getCounters().findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
         newTotalRecords = counter.getValue();
       }
+      final Path finalDstPath = dstPath;
       goodPaths.stream().forEach(p -> {
         String fileName = p.getName();
-        outputFiles.add(new Path(dstPath, fileName));
+        outputFiles.add(new Path(finalDstPath, fileName));
       });
       this.configurator.setDstNewFiles(outputFiles);
 
       State compactState = helper.loadState(new Path(result.getDstAbsoluteDir()));
-      if(executeCount!=0) {
-        compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL + Long.toString(executeCount), Long.toString(helper.readRecordCount(new Path(result.getDstAbsoluteDir()))));
-        compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL + Long.toString(executeCount), Long.toString(executeCount));
-        compactState.setProp("DuplicateRecordCount" + Long.toString(executeCount), compactState.getProp("DuplicateRecordCount", "null"));
+      if (executeCount != 0) {
+        compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL + Long.toString(executeCount),

Review comment:
       Can we rename compactState to compactionState?

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
##########
@@ -125,9 +122,15 @@ public void onCompactionJobComplete(FileSystemDataset dataset) throws IOExceptio
         // (all previous run + current run) is possible.
         newTotalRecords = this.configurator.getFileNameRecordCount();
       } else {
-        this.configurator.getOldFiles()
-            .add(this.fs.makeQualified(dstPath).toString());
-        this.fs.delete(dstPath, true);
+        if (state.getPropAsBoolean(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, false)) {
+          Path oldFilePath = new Path(dstPath, String.format(ConfigurationKeys.COMPACTION_DIRECTORY_FORMAT, executeCount));

Review comment:
       executeCount -> executionCount?

##########
File path: gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
##########
@@ -993,6 +993,13 @@
   public static final String COMPACTION_PRIORITIZER_ALIAS = COMPACTION_PRIORITIZATION_PREFIX + "prioritizerAlias";
   public static final String COMPACTION_ESTIMATOR = COMPACTION_PRIORITIZATION_PREFIX + "estimator";
 
+  /***
+   * Configuration properties related to Re-compaction
+   */
+  public static String COMPACTION_DIRECTORY_FORMAT = "compaction_%s";

Review comment:
       Can we define the constants inside the specific class where it is accessed instead of ConfigurationKeys?

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
##########
@@ -144,28 +147,32 @@ public void onCompactionJobComplete(FileSystemDataset dataset) throws IOExceptio
         Counter counter = job.getCounters().findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
         newTotalRecords = counter.getValue();
       }
+      final Path finalDstPath = dstPath;
       goodPaths.stream().forEach(p -> {
         String fileName = p.getName();
-        outputFiles.add(new Path(dstPath, fileName));
+        outputFiles.add(new Path(finalDstPath, fileName));
       });
       this.configurator.setDstNewFiles(outputFiles);
 
       State compactState = helper.loadState(new Path(result.getDstAbsoluteDir()));
-      if(executeCount!=0) {
-        compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL + Long.toString(executeCount), Long.toString(helper.readRecordCount(new Path(result.getDstAbsoluteDir()))));
-        compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL + Long.toString(executeCount), Long.toString(executeCount));
-        compactState.setProp("DuplicateRecordCount" + Long.toString(executeCount), compactState.getProp("DuplicateRecordCount", "null"));
+      if (executeCount != 0) {
+        compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL + Long.toString(executeCount),
+            Long.toString(helper.readRecordCount(new Path(result.getDstAbsoluteDir()))));
+        compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL + Long.toString(executeCount),
+            Long.toString(executeCount));
+        compactState.setProp("DuplicateRecordCount" + Long.toString(executeCount),
+            compactState.getProp("DuplicateRecordCount", "null"));
       }
       compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
       compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
       compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
           this.configurator.getConfiguredJob().getJobID().toString());
-      compactState.setProp("DuplicateRecordCount", job.getCounters().findCounter(
-          RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
-      compactState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME, this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
+      compactState.setProp("DuplicateRecordCount",
+          job.getCounters().findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
+      compactState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
+          this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
       helper.saveState(new Path(result.getDstAbsoluteDir()), compactState);
-      log.info("duplicated records count for "+ dstPath + " : " + compactState.getProp("DuplicateRecordCount"));
-
+      log.info("duplicated records count for " + dstPath + " : " + compactState.getProp("DuplicateRecordCount"));

Review comment:
       What happens when executeCount = 0? Will compactState.getProp() return a non-null value?

##########
File path: gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
##########
@@ -173,6 +173,40 @@ public void testAvroRecompaction() throws Exception {
     Assert.assertTrue(fs.exists(new Path (basePath, "Identity/MemberAccount/hourly/2017/04/03/10")));
   }
 
+  @Test
+  public void testAvroRecompactionWriteToNewPath() throws Exception {
+    FileSystem fs = getFileSystem();
+    String basePath = "/tmp/testRecompaction";
+    fs.delete(new Path(basePath), true);

Review comment:
       Instead of deleting the path inside the test, can we set java.io.File#deleteOnExit(true) for the base dir? It seems like the base dir is being used in other tests and if the tests are executed in multiple threads, we can get unpredictable behavior. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org