You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/23 22:50:54 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

xushiyan commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r928170690


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -78,26 +92,49 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       log.error("Async clustering service shutdown unexpectedly")
       throw new IllegalStateException("Async clustering service shutdown unexpectedly")
     }
+
+    if (metaClient.isDefined && canSkipBatch(batchId)) {
+      val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+      assert(queryId != null)

Review Comment:
   would prefer validation utils to throw with helpful message



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {

Review Comment:
   instead of using UPSERT_OPERATION_OPT_VAL directly , shouldn't we import a default value constant?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -48,12 +50,24 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
 
-  private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
-    DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
-  private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
-    DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
-  private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
-    DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  private val tablePath = new Path(options.getOrElse("path", "Missing 'path' option"))

Review Comment:
   not sure why creating a new Path then toString in next line.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -48,12 +50,24 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
 
-  private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
-    DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
-  private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
-    DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
-  private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
-    DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  private val tablePath = new Path(options.getOrElse("path", "Missing 'path' option"))

Review Comment:
   also the default value seems off? do you intend to throw?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -247,4 +285,18 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       writeClient = Option.empty
     }
   }
+
+  private def canSkipBatch(batchId: Long): Boolean = {
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    val lastCommit = metaClient.get.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+    if (lastCommit.isPresent) {
+      val commitMetadata = HoodieCommitMetadata.fromBytes(
+        metaClient.get.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+      val lastCheckpoint = commitMetadata.getMetadata(SinkCheckpointKey)
+      if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {

Review Comment:
   /nit StringUtils.nonEmpty()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org