You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/04/13 17:15:23 UTC

[hudi] branch master updated: [HUDI-5990] Avoid missing data during incremental queries (#8299)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c91d7e1f78d [HUDI-5990] Avoid missing data during incremental queries (#8299)
c91d7e1f78d is described below

commit c91d7e1f78dbb4a12dab23b5d4b147bfb150002a
Author: rfyu <39...@users.noreply.github.com>
AuthorDate: Fri Apr 14 01:15:13 2023 +0800

    [HUDI-5990] Avoid missing data during incremental queries (#8299)
    
    The reason for missing data is that the timeline used by
    `MergeOnReadIncrementalRelation` only contain completed
    instants. When the incremental range contains an incomplete
    compaction plan, fsView.getLatestMergedFileSlicesBeforeOrOn
    in collectFileSplits will filter out some fileslices.
---
 .../hudi/MergeOnReadIncrementalRelation.scala      |  4 +-
 .../functional/TestParquetColumnProjection.scala   | 75 ++++++++++++++++++++--
 2 files changed, 73 insertions(+), 6 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 93bf730a56d..636624f3950 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -60,9 +60,9 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext,
 
   override protected def timeline: HoodieTimeline = {
     if (fullTableScan) {
-      super.timeline
+      metaClient.getCommitsAndCompactionTimeline
     } else {
-      super.timeline.findInstantsInRange(startTimestamp, endTimestamp)
+      metaClient.getCommitsAndCompactionTimeline.findInstantsInRange(startTimestamp, endTimestamp)
     }
   }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index 0eefc7beeec..eaf1839d5dc 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -22,12 +22,13 @@ import org.apache.calcite.runtime.SqlFunctions.abs
 import org.apache.hudi.HoodieBaseRelation.projectSchema
 import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig}
 import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload}
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator}
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieMergeOnReadRDD, HoodieSparkUtils, HoodieUnsafeRDD}
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
@@ -252,7 +253,6 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
     runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, fullColumnsReadStats)
   }
 
-  // TODO add test for incremental query of the table with logs
   @Test
   def testMergeOnReadIncrementalRelationWithNoDeltaLogs(): Unit = {
     val tablePath = s"$basePath/mor-no-logs"
@@ -296,6 +296,41 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
       projectedColumnsReadStats, incrementalOpts)
   }
 
+  @Test
+  def testMergeOnReadIncrementalRelationWithDeltaLogs(): Unit = {
+    val tablePath = s"$basePath/mor-with-logs-incr"
+    val targetRecordsCount = 100
+
+    bootstrapMORTableWithDeltaLog(tablePath, targetRecordsCount, defaultWriteOpts, populateMetaFields = true)
+
+    println(s"Running test for $tablePath / incremental")
+    /**
+     * State of timeline and updated data
+     * +--------------+--------------+--------------+--------------+--------------------+--------------+--------------+--------------+
+     * | timeline     | deltacommit1 | deltacommit2 | deltacommit3 | compaction.request | deltacommit4 | deltacommit5 | deltacommit6 |
+     * +--------------+--------------+--------------+--------------+--------------------+--------------+--------------+--------------+
+     * | updated data |      001     |      002     |      003     |                    |      004     |      005     |      006     |
+     * +--------------+--------------+--------------+--------------+--------------------+--------------+--------------+--------------+
+     */
+    val hoodieMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build()
+    val completedCommits = hoodieMetaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
+    val startUnarchivedCommitTs = completedCommits.nthInstant(1).get().getTimestamp //deltacommit2
+    val endUnarchivedCommitTs = completedCommits.nthInstant(5).get().getTimestamp //deltacommit6
+
+    val readOpts = defaultWriteOpts ++ Map(
+      "path" -> tablePath,
+      DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+      DataSourceReadOptions.BEGIN_INSTANTTIME.key -> startUnarchivedCommitTs,
+      DataSourceReadOptions.END_INSTANTTIME.key -> endUnarchivedCommitTs
+    )
+
+    val inputDf = spark.read.format("hudi")
+      .options(readOpts)
+      .load()
+    val commitNum = inputDf.select("rider").distinct().collect().length
+    assertTrue(commitNum > 1)
+  }
+
   // Test routine
   private def runTest(tableState: TableState,
                       queryType: String,
@@ -403,6 +438,38 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
     }
   }
 
+  private def bootstrapMORTableWithDeltaLog(path: String,
+                                recordCount: Int,
+                                opts: Map[String, String],
+                                populateMetaFields: Boolean,
+                                dataGenOpt: Option[HoodieTestDataGenerator] = None): (List[HoodieRecord[_]], Schema) = {
+    val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345))
+
+    // Step 1: Bootstrap table w/ N records (t/h bulk-insert)
+    val (insertedRecords, schema) = bootstrapTable(path, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, recordCount, opts, populateMetaFields, Some(dataGen))
+
+    for (i <- 2 to 6) {
+      val updatesCount = (insertedRecords.length * 0.5).toInt
+      val recordsToUpdate = scala.util.Random.shuffle(insertedRecords).take(updatesCount)
+      val updatedRecords = dataGen.generateUpdates("%03d".format(i), recordsToUpdate.asJava)
+
+      // Step 2: Update M records out of those (t/h update)
+      val inputDF = toDataset(updatedRecords, HoodieTestDataGenerator.AVRO_SCHEMA)
+
+      inputDF.write.format("org.apache.hudi")
+        .options(opts)
+        .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+        .option(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key, "true")
+        .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key, "false")
+        .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key, "3")
+        .option(HoodieTableConfig.POPULATE_META_FIELDS.key, populateMetaFields.toString)
+        .mode(SaveMode.Append)
+        .save(path)
+    }
+
+    (insertedRecords, schema)
+  }
+
   def measureBytesRead[T](f: () => T): (T, Int) = {
     // Init BenchmarkCounter to report number of bytes actually read from the Block
     BenchmarkCounter.initCounterFromReporter(HadoopMapRedUtils.createTestReporter, fs.getConf)