You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/01/30 04:56:49 UTC

[hudi] 16/19: [HUDI-5639] Fixing stream identifier for single writer with spark streaming ingest (#7783)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 8ccb80cb0521ee217d6cb7ae9869c3bf975be8b1
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Sat Jan 28 22:50:23 2023 -0800

    [HUDI-5639] Fixing stream identifier for single writer with spark streaming ingest (#7783)
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  2 +-
 .../main/scala/org/apache/hudi/DefaultSource.scala | 20 +++++++++++----
 .../org/apache/hudi/HoodieStreamingSink.scala      | 20 ++++++++++++---
 .../hudi/functional/TestStructuredStreaming.scala  | 30 +++++++++++++++++++++-
 4 files changed, 61 insertions(+), 11 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 9ed04dae626..f3a169fc06e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -448,7 +448,7 @@ object DataSourceWriteOptions {
 
   val STREAMING_CHECKPOINT_IDENTIFIER: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.streaming.checkpoint.identifier")
-    .noDefaultValue()
+    .defaultValue("default_single_writer")
     .sinceVersion("0.13.0")
     .withDocumentation("A stream identifier used for HUDI to fetch the right checkpoint(`batch id` to be more specific) "
       + "corresponding this writer. Please note that keep the identifier an unique value for different writer "
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index d2a4372462d..42629789139 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -18,20 +18,19 @@
 package org.apache.hudi
 
 import org.apache.hadoop.fs.Path
-
 import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION}
+import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
 import org.apache.hudi.cdc.CDCRelation
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{HoodieRecord, WriteConcurrencyMode}
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
 import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.util.PathUtils
-
 import org.apache.log4j.LogManager
-
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
 import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit, HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit, HoodieStreamSource}
@@ -40,6 +39,7 @@ import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
 
+import scala.collection.JavaConversions.mapAsJavaMap
 import scala.collection.JavaConverters._
 
 /**
@@ -161,6 +161,7 @@ class DefaultSource extends RelationProvider
                           optParams: Map[String, String],
                           partitionColumns: Seq[String],
                           outputMode: OutputMode): Sink = {
+    validateMultiWriterConfigs(optParams)
     new HoodieStreamingSink(
       sqlContext,
       optParams,
@@ -168,6 +169,15 @@ class DefaultSource extends RelationProvider
       outputMode)
   }
 
+  def validateMultiWriterConfigs(options: Map[String, String]) : Unit = {
+    if (WriteConcurrencyMode.valueOf(options.getOrDefault(WRITE_CONCURRENCY_MODE.key(),
+      WRITE_CONCURRENCY_MODE.defaultValue())) == WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) {
+      // ensure some valid value is set for identifier
+      checkState(options.contains(STREAMING_CHECKPOINT_IDENTIFIER.key()), "For multi-writer scenarios, please set "
+        + STREAMING_CHECKPOINT_IDENTIFIER.key() + ". Each writer should set different values for this identifier")
+    }
+  }
+
   override def shortName(): String = "hudi_v1"
 
   override def sourceSchema(sqlContext: SQLContext,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index ae50e3c56c1..9830d323081 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -23,14 +23,15 @@ import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
 import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecordPayload}
+import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecordPayload, WriteConcurrencyMode}
+import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
 import org.apache.hudi.common.table.marker.MarkerType
 import org.apache.hudi.common.table.timeline.HoodieInstant.State
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.ValidationUtils.{checkArgument, checkState}
 import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, CompactionUtils, JsonUtils, StringUtils}
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieLockConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.{HoodieCorruptedDataException, HoodieException, TableNotFoundException}
 import org.apache.log4j.LogManager
 import org.apache.spark.api.java.JavaSparkContext
@@ -122,7 +123,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
             override def accept(metaClient: HoodieTableMetaClient,
                                 newCommitMetadata: HoodieCommitMetadata): Unit = {
-              options.get(STREAMING_CHECKPOINT_IDENTIFIER.key()) match {
+              getStreamIdentifier(options) match {
                 case Some(identifier) =>
                   // Fetch the latestCommit with checkpoint Info again to avoid concurrency issue in multi-write scenario.
                   val lastCheckpointCommitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
@@ -225,6 +226,17 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     }
   }
 
+  private def getStreamIdentifier(options: Map[String, String]) : Option[String] = {
+    if (WriteConcurrencyMode.valueOf(options.getOrDefault(WRITE_CONCURRENCY_MODE.key(),
+      WRITE_CONCURRENCY_MODE.defaultValue())) == WriteConcurrencyMode.SINGLE_WRITER) {
+      // for single writer model, we will fetch default if not set.
+      Some(options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), STREAMING_CHECKPOINT_IDENTIFIER.defaultValue()))
+    } else {
+      // incase of multi-writer scenarios, there is not default.
+      options.get(STREAMING_CHECKPOINT_IDENTIFIER.key())
+    }
+  }
+
   override def toString: String = s"HoodieStreamingSink[${options("path")}]"
 
   @annotation.tailrec
@@ -316,7 +328,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private def canSkipBatch(incomingBatchId: Long, operationType: String): Boolean = {
     if (!DELETE_OPERATION_OPT_VAL.equals(operationType)) {
-      options.get(STREAMING_CHECKPOINT_IDENTIFIER.key()) match {
+      getStreamIdentifier(options) match {
         case Some(identifier) =>
           // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
           val commitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index a26858dabb4..1e3356a9583 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -315,7 +315,35 @@ class TestStructuredStreaming extends HoodieClientTestBase {
     assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "1")
   }
 
-  def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient,
+  @Test
+  def testStructuredStreamingForDefaultIdentifier(): Unit = {
+    val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
+
+    val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    val schema = inputDF1.schema
+
+    inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
+
+    val query1 = spark.readStream
+      .schema(schema)
+      .json(sourcePath)
+      .writeStream
+      .format("org.apache.hudi")
+      .options(commonOpts)
+      .outputMode(OutputMode.Append)
+      .option("checkpointLocation", s"$basePath/checkpoint1")
+      .start(destPath)
+
+    query1.processAllAvailable()
+    val metaClient = HoodieTableMetaClient.builder
+      .setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build
+
+    assertLatestCheckpointInfoMatched(metaClient, STREAMING_CHECKPOINT_IDENTIFIER.defaultValue(), "0")
+    query1.stop()
+  }
+
+    def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient,
                                         identifier: String,
                                         expectBatchId: String): Unit = {
     metaClient.reloadActiveTimeline()