You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/01/30 04:56:49 UTC
[hudi] 16/19: [HUDI-5639] Fixing stream identifier for single writer with spark streaming ingest (#7783)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 8ccb80cb0521ee217d6cb7ae9869c3bf975be8b1
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Sat Jan 28 22:50:23 2023 -0800
[HUDI-5639] Fixing stream identifier for single writer with spark streaming ingest (#7783)
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 2 +-
.../main/scala/org/apache/hudi/DefaultSource.scala | 20 +++++++++++----
.../org/apache/hudi/HoodieStreamingSink.scala | 20 ++++++++++++---
.../hudi/functional/TestStructuredStreaming.scala | 30 +++++++++++++++++++++-
4 files changed, 61 insertions(+), 11 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 9ed04dae626..f3a169fc06e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -448,7 +448,7 @@ object DataSourceWriteOptions {
val STREAMING_CHECKPOINT_IDENTIFIER: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.streaming.checkpoint.identifier")
- .noDefaultValue()
+ .defaultValue("default_single_writer")
.sinceVersion("0.13.0")
.withDocumentation("A stream identifier used for HUDI to fetch the right checkpoint(`batch id` to be more specific) "
+ "corresponding this writer. Please note that keep the identifier an unique value for different writer "
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index d2a4372462d..42629789139 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -18,20 +18,19 @@
package org.apache.hudi
import org.apache.hadoop.fs.Path
-
import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION}
+import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
import org.apache.hudi.cdc.CDCRelation
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{HoodieRecord, WriteConcurrencyMode}
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.util.PathUtils
-
import org.apache.log4j.LogManager
-
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit, HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit, HoodieStreamSource}
@@ -40,6 +39,7 @@ import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
+import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters._
/**
@@ -161,6 +161,7 @@ class DefaultSource extends RelationProvider
optParams: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
+ validateMultiWriterConfigs(optParams)
new HoodieStreamingSink(
sqlContext,
optParams,
@@ -168,6 +169,15 @@ class DefaultSource extends RelationProvider
outputMode)
}
+ def validateMultiWriterConfigs(options: Map[String, String]) : Unit = {
+ if (WriteConcurrencyMode.valueOf(options.getOrDefault(WRITE_CONCURRENCY_MODE.key(),
+ WRITE_CONCURRENCY_MODE.defaultValue())) == WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) {
+ // ensure some valid value is set for identifier
+ checkState(options.contains(STREAMING_CHECKPOINT_IDENTIFIER.key()), "For multi-writer scenarios, please set "
+ + STREAMING_CHECKPOINT_IDENTIFIER.key() + ". Each writer should set different values for this identifier")
+ }
+ }
+
override def shortName(): String = "hudi_v1"
override def sourceSchema(sqlContext: SQLContext,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index ae50e3c56c1..9830d323081 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -23,14 +23,15 @@ import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecordPayload}
+import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecordPayload, WriteConcurrencyMode}
+import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.common.table.marker.MarkerType
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.ValidationUtils.{checkArgument, checkState}
import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, CompactionUtils, JsonUtils, StringUtils}
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieCorruptedDataException, HoodieException, TableNotFoundException}
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
@@ -122,7 +123,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
override def accept(metaClient: HoodieTableMetaClient,
newCommitMetadata: HoodieCommitMetadata): Unit = {
- options.get(STREAMING_CHECKPOINT_IDENTIFIER.key()) match {
+ getStreamIdentifier(options) match {
case Some(identifier) =>
// Fetch the latestCommit with checkpoint Info again to avoid concurrency issue in multi-write scenario.
val lastCheckpointCommitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
@@ -225,6 +226,17 @@ class HoodieStreamingSink(sqlContext: SQLContext,
}
}
+ private def getStreamIdentifier(options: Map[String, String]) : Option[String] = {
+ if (WriteConcurrencyMode.valueOf(options.getOrDefault(WRITE_CONCURRENCY_MODE.key(),
+ WRITE_CONCURRENCY_MODE.defaultValue())) == WriteConcurrencyMode.SINGLE_WRITER) {
+ // for single writer model, we will fetch default if not set.
+ Some(options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), STREAMING_CHECKPOINT_IDENTIFIER.defaultValue()))
+ } else {
+ // incase of multi-writer scenarios, there is not default.
+ options.get(STREAMING_CHECKPOINT_IDENTIFIER.key())
+ }
+ }
+
override def toString: String = s"HoodieStreamingSink[${options("path")}]"
@annotation.tailrec
@@ -316,7 +328,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
private def canSkipBatch(incomingBatchId: Long, operationType: String): Boolean = {
if (!DELETE_OPERATION_OPT_VAL.equals(operationType)) {
- options.get(STREAMING_CHECKPOINT_IDENTIFIER.key()) match {
+ getStreamIdentifier(options) match {
case Some(identifier) =>
// get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
val commitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index a26858dabb4..1e3356a9583 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -315,7 +315,35 @@ class TestStructuredStreaming extends HoodieClientTestBase {
assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "1")
}
- def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient,
+ @Test
+ def testStructuredStreamingForDefaultIdentifier(): Unit = {
+ val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
+
+ val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ val schema = inputDF1.schema
+
+ inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
+
+ val query1 = spark.readStream
+ .schema(schema)
+ .json(sourcePath)
+ .writeStream
+ .format("org.apache.hudi")
+ .options(commonOpts)
+ .outputMode(OutputMode.Append)
+ .option("checkpointLocation", s"$basePath/checkpoint1")
+ .start(destPath)
+
+ query1.processAllAvailable()
+ val metaClient = HoodieTableMetaClient.builder
+ .setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build
+
+ assertLatestCheckpointInfoMatched(metaClient, STREAMING_CHECKPOINT_IDENTIFIER.defaultValue(), "0")
+ query1.stop()
+ }
+
+ def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient,
identifier: String,
expectBatchId: String): Unit = {
metaClient.reloadActiveTimeline()