You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/29 16:05:25 UTC
[hudi] branch master updated: [HUDI-4308] READ_OPTIMIZED read mode will temporary loss of data when compaction (#6664)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6b48c32f71 [HUDI-4308] READ_OPTIMIZED read mode will temporary loss of data when compaction (#6664)
6b48c32f71 is described below
commit 6b48c32f7133fa51ccd844c2145db84ee0ea9788
Author: aiden.dong <78...@qq.com>
AuthorDate: Fri Sep 30 00:05:18 2022 +0800
[HUDI-4308] READ_OPTIMIZED read mode will temporary loss of data when compaction (#6664)
Co-authored-by: Y Ethan Guo <et...@gmail.com>
---
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 45 ++++----
.../apache/hudi/functional/TestMORDataSource.scala | 119 ++++++++++++++++++++-
2 files changed, 135 insertions(+), 29 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 0aa9d40e28..8ad2bd6654 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -18,8 +18,6 @@
package org.apache.hudi;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -40,6 +38,9 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.metadata.HoodieTableMetadata;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -291,31 +292,23 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable {
validate(activeTimeline, queryInstant);
- if (tableType.equals(HoodieTableType.MERGE_ON_READ) && queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
- cachedAllInputFileSlices = partitionFiles.keySet().stream()
- .collect(Collectors.toMap(
- Function.identity(),
- partitionPath ->
- queryInstant.map(instant ->
- fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get())
+ // NOTE: For MOR table, when the compaction is inflight, we need to not only fetch the
+ // latest slices, but also include the base and log files of the second-last version of
+ // the file slice in the same file group as the latest file slice that is under compaction.
+ // This logic is realized by `AbstractTableFileSystemView::getLatestMergedFileSlicesBeforeOrOn`
+ // API. Note that for COW table, the merging logic of two slices does not happen as there
+ // is no compaction, thus there is no performance impact.
+ cachedAllInputFileSlices = partitionFiles.keySet().stream()
+ .collect(Collectors.toMap(
+ Function.identity(),
+ partitionPath ->
+ queryInstant.map(instant ->
+ fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get())
+ )
+ .orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
.collect(Collectors.toList())
- )
- .orElse(Collections.emptyList())
- )
- );
- } else {
- cachedAllInputFileSlices = partitionFiles.keySet().stream()
- .collect(Collectors.toMap(
- Function.identity(),
- partitionPath ->
- queryInstant.map(instant ->
- fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true)
- )
- .orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
- .collect(Collectors.toList())
- )
- );
- }
+ )
+ );
cachedFileSize = cachedAllInputFileSlices.values().stream()
.flatMap(Collection::stream)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 87af8c668c..1cdcf9ed23 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -20,19 +20,21 @@ package org.apache.hudi.functional
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toJavaOption
+import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordPayload, HoodieTableType, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util
-import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
+import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
import org.apache.hudi.util.JFunction
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin}
import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
@@ -44,6 +46,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import java.util.function.Consumer
+import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters._
/**
@@ -978,4 +981,114 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 0)
assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 20)
}
+
+ /**
+ * Test read-optimized query on MOR during inflight compaction.
+ *
+ * The following scenario is tested:
+ * Hudi timeline:
+ * > Deltacommit1 (DC1, completed), writing file group 1 (fg1)
+ * > Deltacommit2 (DC2, completed), updating fg1
+ * > Compaction3 (C3, inflight), compacting fg1
+ * > Deltacommit4 (DC4, completed), updating fg1
+ *
+ * On storage, these are the data files for fg1:
+ * file slice v1:
+ * - fg1_dc1.parquet (from DC1)
+ * - .fg1_dc1.log (from DC2)
+ * file slice v2:
+ * - fg1_c3.parquet (from C3, inflight)
+ * - .fg1_c3.log (from DC4)
+ *
+ * The read-optimized query should read `fg1_dc1.parquet` only in this case.
+ */
+ @Test
+ def testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(): Unit = {
+ val (tableName, tablePath) = ("hoodie_mor_ro_read_test_table", s"${basePath}_mor_test_table")
+ val precombineField = "col3"
+ val recordKeyField = "key"
+ val dataField = "age"
+
+ val options = Map[String, String](
+ DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.MERGE_ON_READ.name,
+ DataSourceWriteOptions.OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> precombineField,
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeyField,
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
+ HoodieWriteConfig.TBL_NAME.key -> tableName,
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1")
+
+ // First batch with all inserts
+ // Deltacommit1 (DC1, completed), writing file group 1 (fg1)
+ // fg1_dc1.parquet written to storage
+ // For record key "0", the row is (0, 0, 1000)
+ val firstDf = spark.range(0, 10).toDF(recordKeyField)
+ .withColumn(precombineField, expr(recordKeyField))
+ .withColumn(dataField, expr(recordKeyField + " + 1000"))
+
+ firstDf.write.format("hudi")
+ .options(options)
+ .mode(SaveMode.Overwrite)
+ .save(tablePath)
+
+ // Second batch with all updates
+ // Deltacommit2 (DC2, completed), updating fg1
+ // .fg1_dc1.log (from DC2) written to storage
+ // For record key "0", the row is (0, 0, 2000)
+ val secondDf = spark.range(0, 10).toDF(recordKeyField)
+ .withColumn(precombineField, expr(recordKeyField))
+ .withColumn(dataField, expr(recordKeyField + " + 2000"))
+
+ secondDf.write.format("hudi")
+ .options(options)
+ .mode(SaveMode.Append).save(tablePath)
+
+ val compactionOptions = options ++ Map(
+ HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key -> CompactionTriggerStrategy.NUM_COMMITS.name,
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "1",
+ DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> "false",
+ HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName
+ )
+
+ // Schedule and execute compaction, leaving the compaction inflight
+ // Compaction3 (C3, inflight), compacting fg1
+ // fg1_c3.parquet is written to storage
+ val client = DataSourceUtils.createHoodieClient(
+ spark.sparkContext, "", tablePath, tableName,
+ mapAsJavaMap(compactionOptions)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
+
+ val compactionInstant = client.scheduleCompaction(org.apache.hudi.common.util.Option.empty()).get()
+
+ // NOTE: this executes the compaction to write the compacted base files, and leaves the
+ // compaction instant still inflight, emulating a compaction action that is in progress
+ client.compact(compactionInstant)
+ client.close()
+
+ // Third batch with all updates
+ // Deltacommit4 (DC4, completed), updating fg1
+ // .fg1_c3.log (from DC4) is written to storage
+ // For record key "0", the row is (0, 0, 3000)
+ val thirdDf = spark.range(0, 10).toDF(recordKeyField)
+ .withColumn(precombineField, expr(recordKeyField))
+ .withColumn(dataField, expr(recordKeyField + " + 3000"))
+
+ thirdDf.write.format("hudi")
+ .options(options)
+ .mode(SaveMode.Append).save(tablePath)
+
+ // Read-optimized query on MOR
+ val roDf = spark.read.format("org.apache.hudi")
+ .option(
+ DataSourceReadOptions.QUERY_TYPE.key,
+ DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .load(tablePath)
+
+ // The base file in the first file slice, i.e., fg1_dc1.parquet, should be read only
+ assertEquals(10, roDf.count())
+ assertEquals(
+ 1000L,
+ roDf.where(col(recordKeyField) === 0).select(dataField).collect()(0).getLong(0))
+ }
}