You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/10/30 05:33:35 UTC

[hudi] 05/14: [HUDI-1346] Choose a new instant time when performing autoClean.

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit a2dcacf7b370096e5a65dcefdbe9dcc7cdd80078
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Fri Oct 16 18:15:31 2020 -0700

    [HUDI-1346] Choose a new instant time when performing autoClean.
---
 .../org/apache/hudi/client/AsyncCleanerService.java   |  7 ++++---
 .../org/apache/hudi/client/HoodieWriteClient.java     | 19 ++++++++++---------
 .../org/apache/hudi/metadata/TestHoodieMetadata.java  |  5 ++++-
 3 files changed, 18 insertions(+), 13 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
index 6367e79..331948d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client;
 
 import org.apache.hudi.async.AbstractAsyncService;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.log4j.LogManager;
@@ -52,11 +53,11 @@ class AsyncCleanerService extends AbstractAsyncService {
     }), executor);
   }
 
-  public static AsyncCleanerService startAsyncCleaningIfEnabled(HoodieWriteClient writeClient,
-                                                                String instantTime) {
+  public static AsyncCleanerService startAsyncCleaningIfEnabled(HoodieWriteClient writeClient) {
     AsyncCleanerService asyncCleanerService = null;
     if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
-      LOG.info("Auto cleaning is enabled. Running cleaner async to write operation");
+      String instantTime = HoodieActiveTimeline.createNewInstantTime();
+      LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
       asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
       asyncCleanerService.start(null);
     } else {
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index a0019b0..614f0dc 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -193,7 +193,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
     table.validateUpsertSchema();
     setOperationType(WriteOperationType.UPSERT);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.upsert(jsc, instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -214,7 +214,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
     table.validateUpsertSchema();
     setOperationType(WriteOperationType.UPSERT_PREPPED);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
   }
@@ -233,7 +233,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
     return postWrite(result, instantTime, table);
   }
@@ -253,7 +253,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT_PREPPED);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
   }
@@ -293,7 +293,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.BULK_INSERT);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, userDefinedBulkInsertPartitioner);
     return postWrite(result, instantTime, table);
   }
@@ -319,7 +319,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner);
     return postWrite(result, instantTime, table);
   }
@@ -335,7 +335,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT_OVERWRITE);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.insertOverwrite(jsc, instantTime, records);
     return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
   }
@@ -425,8 +425,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
         AsyncCleanerService.waitForCompletion(asyncCleanerService);
         LOG.info("Cleaner has finished");
       } else {
+        // Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps.
         LOG.info("Auto cleaning is enabled. Running cleaner now");
-        clean(instantTime);
+        clean();
       }
     }
   }
@@ -569,7 +570,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * cleaned)
    */
   public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
-    LOG.info("Cleaner started");
+    LOG.info("Cleaner started for instant time " + cleanInstantTime);
     final Timer.Context context = metrics.getCleanCtx();
     HoodieCleanMetadata metadata = getTable().clean(jsc, cleanInstantTime);
     if (context != null && metadata != null) {
diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
index 9395d5f..751e2ab 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -457,6 +458,8 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
     init(HoodieTableType.COPY_ON_WRITE);
 
     final int maxDeltaCommitsBeforeCompaction = 6;
+    // Test autoClean and asyncClean based on this flag which is randomly chosen.
+    boolean asyncClean = new Random().nextBoolean();
     HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
         .withMetadataCompactionConfig(HoodieCompactionConfig.newBuilder()
             .archiveCommitsWith(2, 4).retainCommits(1).retainFileVersions(1).withAutoClean(true)
@@ -464,7 +467,7 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
             .withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction)
             .build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3)
-            .retainCommits(1).retainFileVersions(1).withAutoClean(false).build())
+            .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(asyncClean).build())
         .build();
     List<HoodieRecord> records;
     HoodieTableMetaClient metaClient = ClientUtils.createMetaClient(jsc.hadoopConfiguration(), config, true);