You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/02/22 23:54:49 UTC

[hudi] branch master updated: [HUDI-5817] Fix async indexer metadata writer to avoid eager rollback and failed write cleaning (#8001)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ad2a37d6796 [HUDI-5817] Fix async indexer metadata writer to avoid eager rollback and failed write cleaning (#8001)
ad2a37d6796 is described below

commit ad2a37d6796316129b3c9cc3f92c3b1445c86fda
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Wed Feb 22 15:54:41 2023 -0800

    [HUDI-5817] Fix async indexer metadata writer to avoid eager rollback and failed write cleaning (#8001)
    
    - This PR fixes SparkHoodieBackedTableMetadataWriter so that the rollback of failed writes is not triggered by the async indexer.
---
 .../SparkHoodieBackedTableMetadataWriter.java      |  3 +-
 .../apache/hudi/utilities/TestHoodieIndexer.java   | 66 +++++++++++++++++++++-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  1 -
 3 files changed, 66 insertions(+), 4 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 37222c8266a..e321c641e04 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -150,7 +150,8 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
     engineContext.setJobStatus(this.getClass().getName(), "Committing " + instantTime + " to metadata table " + metadataWriteConfig.getTableName());
     try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig)) {
       // rollback partially failed writes if any.
-      if (writeClient.rollbackFailedWrites()) {
+      if (dataWriteConfig.getFailedWritesCleanPolicy().isEager()
+          && writeClient.rollbackFailedWrites()) {
         metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
       }
       if (canTriggerTableService) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index 67504cb957d..bb4f6abe893 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -158,11 +158,11 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
   }
 
   @Test
-  public void testIndexerWithWriter() throws IOException {
+  public void testIndexerWithWriterFinishingFirst() throws IOException {
     // Test the case where the indexer is running, i.e., the delta commit in the metadata table
     // is inflight, while the regular writer is updating metadata table.
     // The delta commit from the indexer should not be rolled back.
-    String tableName = "indexer_with_writer";
+    String tableName = "indexer_with_writer_finishing_first";
     // Enable files and bloom_filters on the regular write client
     HoodieMetadataConfig.Builder metadataConfigBuilder =
         getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
@@ -229,6 +229,68 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
         .stream().findFirst().get().getCommitTime());
   }
 
+  @Test
+  public void testIndexerWithWriterFinishingLast() throws IOException {
+    // Test the case where a regular write updating the metadata table is in progress,
+    // i.e., a delta commit in the metadata table is inflight, and the async indexer
+    // finishes the original delta commit.  In this case, the async indexer should not
+    // trigger the rollback on other inflight writes in the metadata table.
+    String tableName = "indexer_with_writer_finishing_first";
+    // Enable files and bloom_filters on the regular write client
+    HoodieMetadataConfig.Builder metadataConfigBuilder =
+        getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
+    HoodieMetadataConfig metadataConfig = metadataConfigBuilder.build();
+    upsertToTable(metadataConfig, tableName);
+    upsertToTable(metadataConfig, tableName);
+
+    // Transition the last commit to inflight
+    HoodieInstant commit = metaClient.getActiveTimeline().lastInstant().get();
+    String commitTime = commit.getTimestamp();
+    metaClient.getActiveTimeline().revertToInflight(commit);
+
+    HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(
+        context(), metadataConfig, metaClient.getBasePathV2().toString(),
+        getWriteConfigBuilder(basePath(), tableName).build().getSpillableMapBasePath());
+    HoodieTableMetaClient metadataMetaClient = metadata.getMetadataMetaClient();
+    HoodieInstant mdtCommit = metadataMetaClient.getActiveTimeline()
+        .filter(i -> i.getTimestamp().equals(commitTime))
+        .getInstants().get(0);
+    metadataMetaClient.getActiveTimeline().revertToInflight(mdtCommit);
+
+    // Run async indexer, creating a new indexing instant in the data table and a new delta commit
+    // in the metadata table, with the suffix "004"
+    HoodieIndexer.Config config = new HoodieIndexer.Config();
+    String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath();
+    config.basePath = basePath();
+    config.tableName = tableName;
+    config.indexTypes = COLUMN_STATS.name();
+    config.runningMode = SCHEDULE_AND_EXECUTE;
+    config.propsFilePath = propsPath;
+    config.configs.add(HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.key() + "=" + colStatsFileGroupCount);
+    config.configs.add(HoodieMetadataConfig.METADATA_INDEX_CHECK_TIMEOUT_SECONDS + "=1");
+
+    // start the indexer and validate files index is completely built out
+    HoodieIndexer indexer = new HoodieIndexer(jsc(), config);
+    // The catchup won't finish due to inflight delta commit, and this is expected
+    assertEquals(-1, indexer.start(0));
+
+    // Now, make sure that the inflight delta commit happened before the async indexer
+    // is intact
+    metaClient = reload(metaClient);
+    metadataMetaClient = reload(metadataMetaClient);
+
+    assertTrue(metaClient.getActiveTimeline().containsInstant(commitTime));
+    assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(commitTime));
+    assertTrue(metaClient.getActiveTimeline()
+        .filter(i -> i.getTimestamp().equals(commitTime))
+        .getInstants().get(0).isInflight());
+    assertTrue(metadataMetaClient.getActiveTimeline()
+        .filter(i -> i.getTimestamp().equals(commitTime))
+        .getInstants().get(0).isInflight());
+    assertTrue(metaClient.getActiveTimeline().getRollbackTimeline().empty());
+    assertTrue(metadataMetaClient.getActiveTimeline().getRollbackTimeline().empty());
+  }
+
   private static Stream<Arguments> colStatsFileGroupCountParams() {
     return Stream.of(
         Arguments.of(1),
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 03f8392511b..eb6ab80b5f9 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1155,7 +1155,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     return config;
   }
 
-  @Disabled("HUDI-5815 for investigation")
   @ParameterizedTest
   @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
   public void testHoodieIndexer(HoodieRecordType recordType) throws Exception {