You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/10/30 05:33:35 UTC
[hudi] 05/14: [HUDI-1346] Choose a new instant time when performing
autoClean.
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit a2dcacf7b370096e5a65dcefdbe9dcc7cdd80078
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Fri Oct 16 18:15:31 2020 -0700
[HUDI-1346] Choose a new instant time when performing autoClean.
---
.../org/apache/hudi/client/AsyncCleanerService.java | 7 ++++---
.../org/apache/hudi/client/HoodieWriteClient.java | 19 ++++++++++---------
.../org/apache/hudi/metadata/TestHoodieMetadata.java | 5 ++++-
3 files changed, 18 insertions(+), 13 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
index 6367e79..331948d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client;
import org.apache.hudi.async.AbstractAsyncService;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
@@ -52,11 +53,11 @@ class AsyncCleanerService extends AbstractAsyncService {
}), executor);
}
- public static AsyncCleanerService startAsyncCleaningIfEnabled(HoodieWriteClient writeClient,
- String instantTime) {
+ public static AsyncCleanerService startAsyncCleaningIfEnabled(HoodieWriteClient writeClient) {
AsyncCleanerService asyncCleanerService = null;
if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
- LOG.info("Auto cleaning is enabled. Running cleaner async to write operation");
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
asyncCleanerService.start(null);
} else {
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index a0019b0..614f0dc 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -193,7 +193,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
- this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata result = table.upsert(jsc, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -214,7 +214,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT_PREPPED);
- this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
@@ -233,7 +233,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT);
- this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
return postWrite(result, instantTime, table);
}
@@ -253,7 +253,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_PREPPED);
- this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
@@ -293,7 +293,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT);
- this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, userDefinedBulkInsertPartitioner);
return postWrite(result, instantTime, table);
}
@@ -319,7 +319,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
- this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner);
return postWrite(result, instantTime, table);
}
@@ -335,7 +335,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_OVERWRITE);
- this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
HoodieWriteMetadata result = table.insertOverwrite(jsc, instantTime, records);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}
@@ -425,8 +425,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
AsyncCleanerService.waitForCompletion(asyncCleanerService);
LOG.info("Cleaner has finished");
} else {
+ // Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps.
LOG.info("Auto cleaning is enabled. Running cleaner now");
- clean(instantTime);
+ clean();
}
}
}
@@ -569,7 +570,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* cleaned)
*/
public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
- LOG.info("Cleaner started");
+ LOG.info("Cleaner started for instant time " + cleanInstantTime);
final Timer.Context context = metrics.getCleanCtx();
HoodieCleanMetadata metadata = getTable().clean(jsc, cleanInstantTime);
if (context != null && metadata != null) {
diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
index 9395d5f..751e2ab 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Random;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
@@ -457,6 +458,8 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
init(HoodieTableType.COPY_ON_WRITE);
final int maxDeltaCommitsBeforeCompaction = 6;
+ // Test autoClean and asyncClean based on this flag which is randomly chosen.
+ boolean asyncClean = new Random().nextBoolean();
HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
.withMetadataCompactionConfig(HoodieCompactionConfig.newBuilder()
.archiveCommitsWith(2, 4).retainCommits(1).retainFileVersions(1).withAutoClean(true)
@@ -464,7 +467,7 @@ public class TestHoodieMetadata extends HoodieClientTestHarness {
.withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3)
- .retainCommits(1).retainFileVersions(1).withAutoClean(false).build())
+ .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(asyncClean).build())
.build();
List<HoodieRecord> records;
HoodieTableMetaClient metaClient = ClientUtils.createMetaClient(jsc.hadoopConfiguration(), config, true);