You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/04/13 17:15:23 UTC
[hudi] branch master updated: [HUDI-5990] Avoid missing data during incremental queries (#8299)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c91d7e1f78d [HUDI-5990] Avoid missing data during incremental queries (#8299)
c91d7e1f78d is described below
commit c91d7e1f78dbb4a12dab23b5d4b147bfb150002a
Author: rfyu <39...@users.noreply.github.com>
AuthorDate: Fri Apr 14 01:15:13 2023 +0800
[HUDI-5990] Avoid missing data during incremental queries (#8299)
The reason for missing data is that the timeline used by
`MergeOnReadIncrementalRelation` only contain completed
instants. When the incremental range contains an incomplete
compaction plan, fsView.getLatestMergedFileSlicesBeforeOrOn
in collectFileSplits will filter out some fileslices.
---
.../hudi/MergeOnReadIncrementalRelation.scala | 4 +-
.../functional/TestParquetColumnProjection.scala | 75 ++++++++++++++++++++--
2 files changed, 73 insertions(+), 6 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 93bf730a56d..636624f3950 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -60,9 +60,9 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext,
override protected def timeline: HoodieTimeline = {
if (fullTableScan) {
- super.timeline
+ metaClient.getCommitsAndCompactionTimeline
} else {
- super.timeline.findInstantsInRange(startTimestamp, endTimestamp)
+ metaClient.getCommitsAndCompactionTimeline.findInstantsInRange(startTimestamp, endTimestamp)
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index 0eefc7beeec..eaf1839d5dc 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -22,12 +22,13 @@ import org.apache.calcite.runtime.SqlFunctions.abs
import org.apache.hudi.HoodieBaseRelation.projectSchema
import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig}
import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload}
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator}
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieMergeOnReadRDD, HoodieSparkUtils, HoodieUnsafeRDD}
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
@@ -252,7 +253,6 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, fullColumnsReadStats)
}
- // TODO add test for incremental query of the table with logs
@Test
def testMergeOnReadIncrementalRelationWithNoDeltaLogs(): Unit = {
val tablePath = s"$basePath/mor-no-logs"
@@ -296,6 +296,41 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
projectedColumnsReadStats, incrementalOpts)
}
+ @Test
+ def testMergeOnReadIncrementalRelationWithDeltaLogs(): Unit = {
+ val tablePath = s"$basePath/mor-with-logs-incr"
+ val targetRecordsCount = 100
+
+ bootstrapMORTableWithDeltaLog(tablePath, targetRecordsCount, defaultWriteOpts, populateMetaFields = true)
+
+ println(s"Running test for $tablePath / incremental")
+ /**
+ * State of timeline and updated data
+ * +--------------+--------------+--------------+--------------+--------------------+--------------+--------------+--------------+
+ * | timeline | deltacommit1 | deltacommit2 | deltacommit3 | compaction.request | deltacommit4 | deltacommit5 | deltacommit6 |
+ * +--------------+--------------+--------------+--------------+--------------------+--------------+--------------+--------------+
+ * | updated data | 001 | 002 | 003 | | 004 | 005 | 006 |
+ * +--------------+--------------+--------------+--------------+--------------------+--------------+--------------+--------------+
+ */
+ val hoodieMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build()
+ val completedCommits = hoodieMetaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
+ val startUnarchivedCommitTs = completedCommits.nthInstant(1).get().getTimestamp //deltacommit2
+ val endUnarchivedCommitTs = completedCommits.nthInstant(5).get().getTimestamp //deltacommit6
+
+ val readOpts = defaultWriteOpts ++ Map(
+ "path" -> tablePath,
+ DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+ DataSourceReadOptions.BEGIN_INSTANTTIME.key -> startUnarchivedCommitTs,
+ DataSourceReadOptions.END_INSTANTTIME.key -> endUnarchivedCommitTs
+ )
+
+ val inputDf = spark.read.format("hudi")
+ .options(readOpts)
+ .load()
+ val commitNum = inputDf.select("rider").distinct().collect().length
+ assertTrue(commitNum > 1)
+ }
+
// Test routine
private def runTest(tableState: TableState,
queryType: String,
@@ -403,6 +438,38 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
}
}
+ private def bootstrapMORTableWithDeltaLog(path: String,
+ recordCount: Int,
+ opts: Map[String, String],
+ populateMetaFields: Boolean,
+ dataGenOpt: Option[HoodieTestDataGenerator] = None): (List[HoodieRecord[_]], Schema) = {
+ val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345))
+
+ // Step 1: Bootstrap table w/ N records (t/h bulk-insert)
+ val (insertedRecords, schema) = bootstrapTable(path, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, recordCount, opts, populateMetaFields, Some(dataGen))
+
+ for (i <- 2 to 6) {
+ val updatesCount = (insertedRecords.length * 0.5).toInt
+ val recordsToUpdate = scala.util.Random.shuffle(insertedRecords).take(updatesCount)
+ val updatedRecords = dataGen.generateUpdates("%03d".format(i), recordsToUpdate.asJava)
+
+ // Step 2: Update M records out of those (t/h update)
+ val inputDF = toDataset(updatedRecords, HoodieTestDataGenerator.AVRO_SCHEMA)
+
+ inputDF.write.format("org.apache.hudi")
+ .options(opts)
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .option(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key, "true")
+ .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key, "false")
+ .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key, "3")
+ .option(HoodieTableConfig.POPULATE_META_FIELDS.key, populateMetaFields.toString)
+ .mode(SaveMode.Append)
+ .save(path)
+ }
+
+ (insertedRecords, schema)
+ }
+
def measureBytesRead[T](f: () => T): (T, Int) = {
// Init BenchmarkCounter to report number of bytes actually read from the Block
BenchmarkCounter.initCounterFromReporter(HadoopMapRedUtils.createTestReporter, fs.getConf)