You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/09/14 06:29:21 UTC

[hudi] branch master updated: [HUDI-4817] Delete markers after full-record bootstrap operation (#6667)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 39dba79640 [HUDI-4817] Delete markers after full-record bootstrap operation (#6667)
39dba79640 is described below

commit 39dba796405d0bc62a1cdd4818fcd6205012c608
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Tue Sep 13 23:29:13 2022 -0700

    [HUDI-4817] Delete markers after full-record bootstrap operation (#6667)
---
 .../SparkBootstrapCommitActionExecutor.java        | 29 +++++++++++++++-------
 .../org/apache/hudi/functional/TestBootstrap.java  | 12 +++++++--
 2 files changed, 30 insertions(+), 11 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index 4e488047d8..b70c31f925 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -124,9 +124,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
       Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
       // if there are full bootstrap to be performed, perform that too
       Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
-      // Delete the marker directory for the instant
-      WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
-          .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+
       return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);
     } catch (IOException ioe) {
       throw new HoodieIOException(ioe.getMessage(), ioe);
@@ -148,17 +146,22 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
     }
 
     HoodieTableMetaClient metaClient = table.getMetaClient();
+    String bootstrapInstantTime = HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS;
     metaClient.getActiveTimeline().createNewInstant(
-        new HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(),
-            HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS));
+        new HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(), bootstrapInstantTime));
 
     table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
-        metaClient.getCommitActionType(), HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS), Option.empty());
+        metaClient.getCommitActionType(), bootstrapInstantTime), Option.empty());
 
     HoodieData<BootstrapWriteStatus> bootstrapWriteStatuses = runMetadataBootstrap(partitionFilesList);
 
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
     updateIndexAndCommitIfNeeded(bootstrapWriteStatuses.map(w -> w), result);
+
+    // Delete the marker directory for the instant
+    WriteMarkersFactory.get(config.getMarkersType(), table, bootstrapInstantTime)
+        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+
     return Option.of(result);
   }
 
@@ -265,12 +268,20 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
         (JavaRDD<HoodieRecord>) inputProvider.generateInputRecords("bootstrap_source", config.getBootstrapSourceBasePath(),
             partitionFilesList);
     // Start Full Bootstrap
-    final HoodieInstant requested = new HoodieInstant(State.REQUESTED, table.getMetaClient().getCommitActionType(),
-        HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
+    String bootstrapInstantTime = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
+    final HoodieInstant requested = new HoodieInstant(
+        State.REQUESTED, table.getMetaClient().getCommitActionType(), bootstrapInstantTime);
     table.getActiveTimeline().createNewInstant(requested);
 
     // Setup correct schema and run bulk insert.
-    return Option.of(getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute());
+    Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> writeMetadataOption =
+        Option.of(getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute());
+
+    // Delete the marker directory for the instant
+    WriteMarkersFactory.get(config.getMarkersType(), table, bootstrapInstantTime)
+        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+
+    return writeMetadataOption;
   }
 
   protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index 6b54765a0b..93b25f8a65 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -62,6 +62,7 @@ import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.JobConf;
@@ -346,6 +347,7 @@ public class TestBootstrap extends HoodieClientTestBase {
     assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants());
     assertEquals(instant, metaClient.getActiveTimeline()
         .getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
+    verifyNoMarkerInTempFolder();
 
     Dataset<Row> bootstrapped = sqlContext.read().format("parquet").load(basePath);
     Dataset<Row> original = sqlContext.read().format("parquet").load(bootstrapBasePath);
@@ -463,7 +465,7 @@ public class TestBootstrap extends HoodieClientTestBase {
         jsc.hadoopConfiguration(),
         FSUtils.getAllPartitionPaths(context, basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
-        basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
+        basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true,
         Arrays.asList("_row_key"));
     assertEquals(totalRecords, records.size());
     for (GenericRecord r : records) {
@@ -473,6 +475,12 @@ public class TestBootstrap extends HoodieClientTestBase {
     assertEquals(totalRecords, seenKeys.size());
   }
 
+  private void verifyNoMarkerInTempFolder() throws IOException {
+    String tempFolderPath = metaClient.getTempFolderPath();
+    FileSystem fileSystem = FSUtils.getFs(tempFolderPath, jsc.hadoopConfiguration());
+    assertEquals(0, fileSystem.listStatus(new Path(tempFolderPath)).length);
+  }
+
   public static class TestFullBootstrapDataProvider extends FullRecordBootstrapDataProvider<JavaRDD<HoodieRecord>> {
 
     public TestFullBootstrapDataProvider(TypedProperties props, HoodieSparkEngineContext context) {
@@ -481,7 +489,7 @@ public class TestBootstrap extends HoodieClientTestBase {
 
     @Override
     public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourceBasePath,
-        List<Pair<String, List<HoodieFileStatus>>> partitionPaths) {
+                                                      List<Pair<String, List<HoodieFileStatus>>> partitionPaths) {
       String filePath = FileStatusUtils.toPath(partitionPaths.stream().flatMap(p -> p.getValue().stream())
           .findAny().get().getPath()).toString();
       ParquetFileReader reader = null;