You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/29 16:05:25 UTC

[hudi] branch master updated: [HUDI-4308] READ_OPTIMIZED read mode will temporary loss of data when compaction (#6664)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b48c32f71 [HUDI-4308] READ_OPTIMIZED read mode will temporary loss of data when compaction (#6664)
6b48c32f71 is described below

commit 6b48c32f7133fa51ccd844c2145db84ee0ea9788
Author: aiden.dong <78...@qq.com>
AuthorDate: Fri Sep 30 00:05:18 2022 +0800

    [HUDI-4308] READ_OPTIMIZED read mode will temporary loss of data when compaction (#6664)
    
    Co-authored-by: Y Ethan Guo <et...@gmail.com>
---
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  |  45 ++++----
 .../apache/hudi/functional/TestMORDataSource.scala | 119 ++++++++++++++++++++-
 2 files changed, 135 insertions(+), 29 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 0aa9d40e28..8ad2bd6654 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -40,6 +38,9 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.CachingPath;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -291,31 +292,23 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable {
 
     validate(activeTimeline, queryInstant);
 
-    if (tableType.equals(HoodieTableType.MERGE_ON_READ) && queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
-      cachedAllInputFileSlices = partitionFiles.keySet().stream()
-          .collect(Collectors.toMap(
-              Function.identity(),
-              partitionPath ->
-                  queryInstant.map(instant ->
-                    fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get())
+    // NOTE: For MOR table, when the compaction is inflight, we need to not only fetch the
+    // latest slices, but also include the base and log files of the second-last version of
+    // the file slice in the same file group as the latest file slice that is under compaction.
+    // This logic is realized by `AbstractTableFileSystemView::getLatestMergedFileSlicesBeforeOrOn`
+    // API.  Note that for COW table, the merging logic of two slices does not happen as there
+    // is no compaction, thus there is no performance impact.
+    cachedAllInputFileSlices = partitionFiles.keySet().stream()
+        .collect(Collectors.toMap(
+                Function.identity(),
+                partitionPath ->
+                    queryInstant.map(instant ->
+                            fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get())
+                        )
+                        .orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
                         .collect(Collectors.toList())
-                  )
-                  .orElse(Collections.emptyList())
-              )
-          );
-    } else {
-      cachedAllInputFileSlices = partitionFiles.keySet().stream()
-         .collect(Collectors.toMap(
-             Function.identity(),
-             partitionPath ->
-                 queryInstant.map(instant ->
-                     fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true)
-                 )
-                   .orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
-                   .collect(Collectors.toList())
-             )
-         );
-    }
+            )
+        );
 
     cachedFileSize = cachedAllInputFileSlices.values().stream()
         .flatMap(Collection::stream)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 87af8c668c..1cdcf9ed23 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -20,19 +20,21 @@ package org.apache.hudi.functional
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
+import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordPayload, HoodieTableType, OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.util
-import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig}
 import org.apache.hudi.index.HoodieIndex.IndexType
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
+import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
 import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
 import org.apache.hudi.util.JFunction
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin}
 import org.apache.log4j.LogManager
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
@@ -44,6 +46,7 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
 import java.util.function.Consumer
+import scala.collection.JavaConversions.mapAsJavaMap
 import scala.collection.JavaConverters._
 
 /**
@@ -978,4 +981,114 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
     assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 0)
     assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 20)
   }
+
+  /**
+   * Test read-optimized query on MOR during inflight compaction.
+   *
+   * The following scenario is tested:
+   * Hudi timeline:
+   * > Deltacommit1 (DC1, completed), writing file group 1 (fg1)
+   * > Deltacommit2 (DC2, completed), updating fg1
+   * > Compaction3 (C3, inflight), compacting fg1
+   * > Deltacommit4 (DC4, completed), updating fg1
+   *
+   * On storage, these are the data files for fg1:
+   * file slice v1:
+   * - fg1_dc1.parquet (from DC1)
+   * - .fg1_dc1.log (from DC2)
+   * file slice v2:
+   * - fg1_c3.parquet (from C3, inflight)
+   * - .fg1_c3.log (from DC4)
+   *
+   * The read-optimized query should read `fg1_dc1.parquet` only in this case.
+   */
+  @Test
+  def testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(): Unit = {
+    val (tableName, tablePath) = ("hoodie_mor_ro_read_test_table", s"${basePath}_mor_test_table")
+    val precombineField = "col3"
+    val recordKeyField = "key"
+    val dataField = "age"
+
+    val options = Map[String, String](
+      DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.MERGE_ON_READ.name,
+      DataSourceWriteOptions.OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
+      DataSourceWriteOptions.PRECOMBINE_FIELD.key -> precombineField,
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeyField,
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
+      HoodieWriteConfig.TBL_NAME.key -> tableName,
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1")
+
+    // First batch with all inserts
+    // Deltacommit1 (DC1, completed), writing file group 1 (fg1)
+    // fg1_dc1.parquet written to storage
+    // For record key "0", the row is (0, 0, 1000)
+    val firstDf = spark.range(0, 10).toDF(recordKeyField)
+      .withColumn(precombineField, expr(recordKeyField))
+      .withColumn(dataField, expr(recordKeyField + " + 1000"))
+
+    firstDf.write.format("hudi")
+      .options(options)
+      .mode(SaveMode.Overwrite)
+      .save(tablePath)
+
+    // Second batch with all updates
+    // Deltacommit2 (DC2, completed), updating fg1
+    // .fg1_dc1.log (from DC2) written to storage
+    // For record key "0", the row is (0, 0, 2000)
+    val secondDf = spark.range(0, 10).toDF(recordKeyField)
+      .withColumn(precombineField, expr(recordKeyField))
+      .withColumn(dataField, expr(recordKeyField + " + 2000"))
+
+    secondDf.write.format("hudi")
+      .options(options)
+      .mode(SaveMode.Append).save(tablePath)
+
+    val compactionOptions = options ++ Map(
+      HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key -> CompactionTriggerStrategy.NUM_COMMITS.name,
+      HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "1",
+      DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> "false",
+      HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName
+    )
+
+    // Schedule and execute compaction, leaving the compaction inflight
+    // Compaction3 (C3, inflight), compacting fg1
+    // fg1_c3.parquet is written to storage
+    val client = DataSourceUtils.createHoodieClient(
+      spark.sparkContext, "", tablePath, tableName,
+      mapAsJavaMap(compactionOptions)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
+
+    val compactionInstant = client.scheduleCompaction(org.apache.hudi.common.util.Option.empty()).get()
+
+    // NOTE: this executes the compaction to write the compacted base files, and leaves the
+    // compaction instant still inflight, emulating a compaction action that is in progress
+    client.compact(compactionInstant)
+    client.close()
+
+    // Third batch with all updates
+    // Deltacommit4 (DC4, completed), updating fg1
+    // .fg1_c3.log (from DC4) is written to storage
+    // For record key "0", the row is (0, 0, 3000)
+    val thirdDf = spark.range(0, 10).toDF(recordKeyField)
+      .withColumn(precombineField, expr(recordKeyField))
+      .withColumn(dataField, expr(recordKeyField + " + 3000"))
+
+    thirdDf.write.format("hudi")
+      .options(options)
+      .mode(SaveMode.Append).save(tablePath)
+
+    // Read-optimized query on MOR
+    val roDf = spark.read.format("org.apache.hudi")
+      .option(
+        DataSourceReadOptions.QUERY_TYPE.key,
+        DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+      .load(tablePath)
+
+    // The base file in the first file slice, i.e., fg1_dc1.parquet, should be read only
+    assertEquals(10, roDf.count())
+    assertEquals(
+      1000L,
+      roDf.where(col(recordKeyField) === 0).select(dataField).collect()(0).getLong(0))
+  }
 }