You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/02/22 23:54:49 UTC
[hudi] branch master updated: [HUDI-5817] Fix async indexer metadata writer to avoid eager rollback and failed write cleaning (#8001)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ad2a37d6796 [HUDI-5817] Fix async indexer metadata writer to avoid eager rollback and failed write cleaning (#8001)
ad2a37d6796 is described below
commit ad2a37d6796316129b3c9cc3f92c3b1445c86fda
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Wed Feb 22 15:54:41 2023 -0800
[HUDI-5817] Fix async indexer metadata writer to avoid eager rollback and failed write cleaning (#8001)
- This PR fixes SparkHoodieBackedTableMetadataWriter so that the rollback of failed writes is not triggered by the async indexer.
---
.../SparkHoodieBackedTableMetadataWriter.java | 3 +-
.../apache/hudi/utilities/TestHoodieIndexer.java | 66 +++++++++++++++++++++-
.../deltastreamer/TestHoodieDeltaStreamer.java | 1 -
3 files changed, 66 insertions(+), 4 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 37222c8266a..e321c641e04 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -150,7 +150,8 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
engineContext.setJobStatus(this.getClass().getName(), "Committing " + instantTime + " to metadata table " + metadataWriteConfig.getTableName());
try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig)) {
// rollback partially failed writes if any.
- if (writeClient.rollbackFailedWrites()) {
+ if (dataWriteConfig.getFailedWritesCleanPolicy().isEager()
+ && writeClient.rollbackFailedWrites()) {
metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
}
if (canTriggerTableService) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index 67504cb957d..bb4f6abe893 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -158,11 +158,11 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
}
@Test
- public void testIndexerWithWriter() throws IOException {
+ public void testIndexerWithWriterFinishingFirst() throws IOException {
// Test the case where the indexer is running, i.e., the delta commit in the metadata table
// is inflight, while the regular writer is updating metadata table.
// The delta commit from the indexer should not be rolled back.
- String tableName = "indexer_with_writer";
+ String tableName = "indexer_with_writer_finishing_first";
// Enable files and bloom_filters on the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder =
getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
@@ -229,6 +229,68 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
.stream().findFirst().get().getCommitTime());
}
+ @Test
+ public void testIndexerWithWriterFinishingLast() throws IOException {
+ // Test the case where a regular write updating the metadata table is in progress,
+ // i.e., a delta commit in the metadata table is inflight, and the async indexer
+ // finishes the original delta commit. In this case, the async indexer should not
+ // trigger the rollback on other inflight writes in the metadata table.
+ String tableName = "indexer_with_writer_finishing_first";
+ // Enable files and bloom_filters on the regular write client
+ HoodieMetadataConfig.Builder metadataConfigBuilder =
+ getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
+ HoodieMetadataConfig metadataConfig = metadataConfigBuilder.build();
+ upsertToTable(metadataConfig, tableName);
+ upsertToTable(metadataConfig, tableName);
+
+ // Transition the last commit to inflight
+ HoodieInstant commit = metaClient.getActiveTimeline().lastInstant().get();
+ String commitTime = commit.getTimestamp();
+ metaClient.getActiveTimeline().revertToInflight(commit);
+
+ HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(
+ context(), metadataConfig, metaClient.getBasePathV2().toString(),
+ getWriteConfigBuilder(basePath(), tableName).build().getSpillableMapBasePath());
+ HoodieTableMetaClient metadataMetaClient = metadata.getMetadataMetaClient();
+ HoodieInstant mdtCommit = metadataMetaClient.getActiveTimeline()
+ .filter(i -> i.getTimestamp().equals(commitTime))
+ .getInstants().get(0);
+ metadataMetaClient.getActiveTimeline().revertToInflight(mdtCommit);
+
+ // Run async indexer, creating a new indexing instant in the data table and a new delta commit
+ // in the metadata table, with the suffix "004"
+ HoodieIndexer.Config config = new HoodieIndexer.Config();
+ String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath();
+ config.basePath = basePath();
+ config.tableName = tableName;
+ config.indexTypes = COLUMN_STATS.name();
+ config.runningMode = SCHEDULE_AND_EXECUTE;
+ config.propsFilePath = propsPath;
+ config.configs.add(HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.key() + "=" + colStatsFileGroupCount);
+ config.configs.add(HoodieMetadataConfig.METADATA_INDEX_CHECK_TIMEOUT_SECONDS + "=1");
+
+ // start the indexer and validate files index is completely built out
+ HoodieIndexer indexer = new HoodieIndexer(jsc(), config);
+ // The catchup won't finish due to inflight delta commit, and this is expected
+ assertEquals(-1, indexer.start(0));
+
+ // Now, make sure that the inflight delta commit happened before the async indexer
+ // is intact
+ metaClient = reload(metaClient);
+ metadataMetaClient = reload(metadataMetaClient);
+
+ assertTrue(metaClient.getActiveTimeline().containsInstant(commitTime));
+ assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(commitTime));
+ assertTrue(metaClient.getActiveTimeline()
+ .filter(i -> i.getTimestamp().equals(commitTime))
+ .getInstants().get(0).isInflight());
+ assertTrue(metadataMetaClient.getActiveTimeline()
+ .filter(i -> i.getTimestamp().equals(commitTime))
+ .getInstants().get(0).isInflight());
+ assertTrue(metaClient.getActiveTimeline().getRollbackTimeline().empty());
+ assertTrue(metadataMetaClient.getActiveTimeline().getRollbackTimeline().empty());
+ }
+
private static Stream<Arguments> colStatsFileGroupCountParams() {
return Stream.of(
Arguments.of(1),
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 03f8392511b..eb6ab80b5f9 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1155,7 +1155,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
return config;
}
- @Disabled("HUDI-5815 for investigation")
@ParameterizedTest
@EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
public void testHoodieIndexer(HoodieRecordType recordType) throws Exception {