You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/13 11:55:40 UTC

[GitHub] [hudi] codope opened a new pull request, #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

codope opened a new pull request, #6098:
URL: https://github.com/apache/hudi/pull/6098

   ## What is the purpose of the pull request
   
   In HoodieStreamingSink, if a microbatch gets committed in Hudi but spark checkpoint write fails, then Spark will retry and Hudi will rewrite the data. This causes data duplication. This PR makes the sink idempotent by adding the batchId to hudi commit metadata and then always checking the current batchId with the previously committed one, and skips if it's a retry.
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1191476903

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 94000c64ce7d980d93f28d6df44b71268627f6e8 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140) 
   * 3e2c3c6b370ed098e5613041a5d92b2f438701de Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1231740485

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051",
       "triggerID" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8a9c5d762c79b41da583561fb827317fb572effd Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265) 
   * 46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r928103368


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -78,26 +82,61 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       log.error("Async clustering service shutdown unexpectedly")
       throw new IllegalStateException("Async clustering service shutdown unexpectedly")
     }
+
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    if (metaClient != null) {

Review Comment:
   That's a very good point! The intention is to check it in every batch. Actually, i did initialize the metaclient in my base commit but i missed it after rebase. Thanks for catching it. I am now initializing the metaclient once again just reloading active timeline in each batch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vinothchandar commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r928368463


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -48,12 +50,24 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
 
-  private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
-    DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
-  private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
-    DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
-  private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
-    DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  private val tablePath = new Path(options.getOrElse("path", "Missing 'path' option"))
+  private var metaClient: Option[HoodieTableMetaClient] = {
+    try {
+      Some(HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration).setBasePath(tablePath.toString).build())
+    } catch {
+      case _: TableNotFoundException =>
+        log.warn("Ignore TableNotFoundException as it is first microbatch.")
+        Option.empty
+    }
+  }
+  private val retryCnt = options.getOrDefault(STREAMING_RETRY_CNT.key,
+    STREAMING_RETRY_CNT.defaultValue).toInt
+  private val retryIntervalMs = options.getOrDefault(STREAMING_RETRY_INTERVAL_MS.key,
+    STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
+  private val ignoreFailedBatch = options.getOrDefault(STREAMING_IGNORE_FAILED_BATCH.key,
+    STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  // This constant serves as the checkpoint key for streaming sink so that each microbatch is processed exactly-once.
+  private val SinkCheckpointKey = "_streaming_sink_checkpoint"

Review Comment:
   Add a " _ hudi " prefixto the key? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] boneanxs commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
boneanxs commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1278431648

   We found if 2 spark structure streaming jobs write data to the same HUDI table(enable multi-writer), the error could throw making one writer skip batches...
   
   ```java
   2022-10-12 14:33:25 INFO  HoodieActiveTimeline:170 Loaded instants upto : Option{val=[20221012143212859__commit__COMPLETED]}
   2022-10-12 14:33:25 WARN  HoodieStreamingSink:108 Skipping already completed batch 0 in query 498e6b86-7401-44d8-9b02-2b16fade25b8
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vinothchandar commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r926077601


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -319,6 +319,11 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "lowest and best effort file sizing. "
           + "NONE: No sorting. Fastest and matches `spark.write.parquet()` in terms of number of files, overheads");
 
+  public static final ConfigProperty<String> BULK_INSERT_WRITE_STREAM_ENABLE = ConfigProperty
+          .key("hoodie.bulkinsert.write.stream")
+          .defaultValue("false")
+          .withDocumentation("Enable this config to do bulk insert with `writeStream` dataset using row-writer path, instead of converting to RDD");

Review Comment:
   yes can we make this the default?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1191405015

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892) 
   * 94000c64ce7d980d93f28d6df44b71268627f6e8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1193072299

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f9c41cc058c4b4b82b26c025b862941577117c96 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1183172372

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1233902559

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051",
       "triggerID" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051) 
   * ef925520b421e774bb22f90f89b300ce70ca620a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r926558193


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+
+    val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    assert(queryId != null)
+    log.warn(s"Query id: $queryId")
+
+    if (metaClient != null) {
+      val lastCommit = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+        val lastCheckpoint = commitMetadata.getMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)
+        if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+          latestBatchId = lastCheckpoint.toLong
+        }
+      }
+    }
+
+    if (latestBatchId >= batchId) {
+      log.warn(s"Skipping already completed batch $batchId in query $queryId")
+      return
+    }
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
           sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
-      ) match {
+      )
+      match {
         case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
-          log.info(s"Micro batch id=$batchId succeeded"
-            + (commitOps.isPresent match {
-            case true => s" for commit=${commitOps.get()}"
-            case _ => s" with no new commits"
+          log.warn(s"Micro batch id=$batchId succeeded"
+            + (if (commitOps.isPresent) {
+            s" for commit=${commitOps.get()}"
+          } else {
+            s" with no new commits"
           }))
+          log.warn(s"Current value of latestBatchId: $latestBatchId")
+          log.warn(s"Setting latestBatchId to batchId $batchId")
+          latestBatchId = batchId
           writeClient = Some(client)
           hoodieTableConfig = Some(tableConfig)
+          metaClient = HoodieTableMetaClient.builder()
+            .setConf(sqlContext.sparkContext.hadoopConfiguration)
+            .setBasePath(client.getConfig.getBasePath)
+            .build()
+          // let's update batchId as checkpoint for this commit
+          if (commitOps.isPresent) {
+            val instant = metaClient.getActiveTimeline.getCompletedInstantForTimestamp(commitOps.get())

Review Comment:
   i am adding to extraMetadata but Siva suggested a better way to handle this. Now a config "_streaming_sink_checkpoint" is added for this purpose. Any config prefixed by `_` gets added to extra metadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r926490072


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {

Review Comment:
   bulk_insert does not work with streaming dataframe because unless we convert to rdd.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope merged pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope merged PR #6098:
URL: https://github.com/apache/hudi/pull/6098


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r928194615


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -48,12 +50,24 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
 
-  private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
-    DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
-  private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
-    DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
-  private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
-    DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  private val tablePath = new Path(options.getOrElse("path", "Missing 'path' option"))
+  private var metaClient: Option[HoodieTableMetaClient] = {
+    try {
+      Some(HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration).setBasePath(tablePath.toString).build())
+    } catch {
+      case _: TableNotFoundException =>
+        log.warn("Ignore TableNotFoundException as it is first microbatch.")
+        Option.empty
+    }
+  }
+  private val retryCnt = options.getOrDefault(STREAMING_RETRY_CNT.key,
+    STREAMING_RETRY_CNT.defaultValue).toInt
+  private val retryIntervalMs = options.getOrDefault(STREAMING_RETRY_INTERVAL_MS.key,
+    STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
+  private val ignoreFailedBatch = options.getOrDefault(STREAMING_IGNORE_FAILED_BATCH.key,
+    STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  // This constant serves as the checkpoint key for streaming sink so that each microbatch is processed exactly-once.
+  private val SinkCheckpointKey = "_streaming_sink_checkpoint"

Review Comment:
   also `SinkCheckpointKey` should be ALL_CAPS ?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -247,4 +285,18 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       writeClient = Option.empty
     }
   }
+
+  private def canSkipBatch(batchId: Long): Boolean = {
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    val lastCommit = metaClient.get.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+    if (lastCommit.isPresent) {
+      val commitMetadata = HoodieCommitMetadata.fromBytes(
+        metaClient.get.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+      val lastCheckpoint = commitMetadata.getMetadata(SinkCheckpointKey)
+      if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+        latestBatchId = lastCheckpoint.toLong
+      }
+    }
+    latestBatchId >= batchId

Review Comment:
   ok in multi-writer case, not sure the current commit metadata in form of single KV pair would suffice future design requirements. we may need a more flexible structure like k={a:b}? just want to avoid handling BWC later



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -48,12 +50,24 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
 
-  private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
-    DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
-  private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
-    DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
-  private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
-    DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  private val tablePath = new Path(options.getOrElse("path", "Missing 'path' option"))
+  private var metaClient: Option[HoodieTableMetaClient] = {
+    try {
+      Some(HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration).setBasePath(tablePath.toString).build())
+    } catch {
+      case _: TableNotFoundException =>
+        log.warn("Ignore TableNotFoundException as it is first microbatch.")
+        Option.empty
+    }
+  }
+  private val retryCnt = options.getOrDefault(STREAMING_RETRY_CNT.key,
+    STREAMING_RETRY_CNT.defaultValue).toInt
+  private val retryIntervalMs = options.getOrDefault(STREAMING_RETRY_INTERVAL_MS.key,
+    STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
+  private val ignoreFailedBatch = options.getOrDefault(STREAMING_IGNORE_FAILED_BATCH.key,
+    STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  // This constant serves as the checkpoint key for streaming sink so that each microbatch is processed exactly-once.
+  private val SinkCheckpointKey = "_streaming_sink_checkpoint"

Review Comment:
   the name `_streaming_sink_checkpoint` may be too generic since we only meant it for spark streaming. it could imply flink or other streaming writer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1193500450

   > I did not fully understand the bulk insert row writing part. But Can we get it fixed in 0.12 please
   
   Yes that's gonna be in 0.12. It's in #6099 but stacked on top of this one. I will decouple the two.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1189858324

   discussed offline: 
   We can leverage `hoodie.datasource.write.commitmeta.key.prefix` to add extra metadata. and don't need to fix the completed commit metadata manually. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1193191447

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8a9c5d762c79b41da583561fb827317fb572effd Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YuweiXiao commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
YuweiXiao commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r928091341


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -78,26 +82,61 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       log.error("Async clustering service shutdown unexpectedly")
       throw new IllegalStateException("Async clustering service shutdown unexpectedly")
     }
+
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    if (metaClient != null) {
+      val lastCommit = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+        val lastCheckpoint = commitMetadata.getMetadata(SinkCheckpointKey)
+        if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+          latestBatchId = lastCheckpoint.toLong
+        }
+      }
+    }
+    if (latestBatchId >= batchId) {
+      val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+      assert(queryId != null)
+      log.warn(s"Skipping already completed batch $batchId in query $queryId")
+      return
+    }
+
     // Override to use direct markers. In Structured streaming, timeline server is closed after
     // first micro-batch and subsequent micro-batches do not have timeline server running.
     // Thus, we can't use timeline-server-based markers.
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+    // add batchId as checkpoint to the extra metadata
+    updatedOptions = updatedOptions.updated(SinkCheckpointKey, batchId.toString)
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
           sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
-      ) match {
+      )
+      match {
         case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
           log.info(s"Micro batch id=$batchId succeeded"
-            + (commitOps.isPresent match {
-            case true => s" for commit=${commitOps.get()}"
-            case _ => s" with no new commits"
+            + (if (commitOps.isPresent) {
+            s" for commit=${commitOps.get()}"
+          } else {

Review Comment:
   The formatting here is not right.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -78,26 +82,61 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       log.error("Async clustering service shutdown unexpectedly")
       throw new IllegalStateException("Async clustering service shutdown unexpectedly")
     }
+
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    if (metaClient != null) {

Review Comment:
   In restart, the metaClient is null (because it seems only be initialized after the first batch). Then we will not check lastBatchId for the first batch after start (or restart after crash). 
   
   And seems the whole checking logic is only necessary for the first batch, right? If so, could we save the checking costs by only doing it once for the first batch? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -78,26 +82,61 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       log.error("Async clustering service shutdown unexpectedly")
       throw new IllegalStateException("Async clustering service shutdown unexpectedly")
     }
+
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    if (metaClient != null) {

Review Comment:
   Could we extra the checking logic to a function? e.g., checkCanSkipBatch(xxx)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1193174588

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bd8b9c43cf0459638381da6d47e03db3d4ff1f8b Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252) 
   * 8a9c5d762c79b41da583561fb827317fb572effd Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1233910225

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051",
       "triggerID" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11085",
       "triggerID" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051) 
   * ef925520b421e774bb22f90f89b300ce70ca620a Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11085) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r923241495


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala:
##########
@@ -50,6 +51,10 @@ class TestStructuredStreaming extends HoodieClientTestBase {
     DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
     DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
     DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+    DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[ComplexKeyGenerator].getName,
+    DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
+    // HoodieWriteConfig.BULK_INSERT_WRITE_STREAM_ENABLE.key -> "true",

Review Comment:
   does tests need more fixes? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {

Review Comment:
   may I know why we are disabling row writer for bulk_insert?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+
+    val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    assert(queryId != null)
+    log.warn(s"Query id: $queryId")
+
+    if (metaClient != null) {
+      val lastCommit = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+        val lastCheckpoint = commitMetadata.getMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)

Review Comment:
   not required in this patch. But do we need to design/impl this similar to how deltastreamer checkpointing is done. with Deltastreamer, its feasible to do 1 writer w/ DS and another writer w/ Spark datasource and still Deltastreamer will be able to fetch the right checkpoint to resume from everytime. 
   Here I see, we are fetching only the latest commit. So this may not work w/ multi -writer scenarios. may be we can create a follow up ticket and work on it rather than expanding the scope of this patch. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+
+    val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    assert(queryId != null)
+    log.warn(s"Query id: $queryId")
+
+    if (metaClient != null) {
+      val lastCommit = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+        val lastCheckpoint = commitMetadata.getMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)
+        if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+          latestBatchId = lastCheckpoint.toLong
+        }
+      }
+    }
+
+    if (latestBatchId >= batchId) {
+      log.warn(s"Skipping already completed batch $batchId in query $queryId")
+      return
+    }
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
           sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
-      ) match {
+      )
+      match {
         case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
-          log.info(s"Micro batch id=$batchId succeeded"
-            + (commitOps.isPresent match {
-            case true => s" for commit=${commitOps.get()}"
-            case _ => s" with no new commits"
+          log.warn(s"Micro batch id=$batchId succeeded"
+            + (if (commitOps.isPresent) {
+            s" for commit=${commitOps.get()}"
+          } else {
+            s" with no new commits"
           }))
+          log.warn(s"Current value of latestBatchId: $latestBatchId")
+          log.warn(s"Setting latestBatchId to batchId $batchId")
+          latestBatchId = batchId
           writeClient = Some(client)
           hoodieTableConfig = Some(tableConfig)
+          metaClient = HoodieTableMetaClient.builder()
+            .setConf(sqlContext.sparkContext.hadoopConfiguration)
+            .setBasePath(client.getConfig.getBasePath)
+            .build()
+          // let's update batchId as checkpoint for this commit
+          if (commitOps.isPresent) {
+            val instant = metaClient.getActiveTimeline.getCompletedInstantForTimestamp(commitOps.get())

Review Comment:
   and I guess, this batchId to commit metadata is not applicable for `STREAMING_IGNORE_FAILED_BATCH` config being set to true? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+
+    val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    assert(queryId != null)
+    log.warn(s"Query id: $queryId")
+
+    if (metaClient != null) {
+      val lastCommit = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+        val lastCheckpoint = commitMetadata.getMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)
+        if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+          latestBatchId = lastCheckpoint.toLong
+        }
+      }
+    }
+
+    if (latestBatchId >= batchId) {
+      log.warn(s"Skipping already completed batch $batchId in query $queryId")
+      return
+    }
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
           sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
-      ) match {
+      )
+      match {
         case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
-          log.info(s"Micro batch id=$batchId succeeded"
-            + (commitOps.isPresent match {
-            case true => s" for commit=${commitOps.get()}"
-            case _ => s" with no new commits"
+          log.warn(s"Micro batch id=$batchId succeeded"
+            + (if (commitOps.isPresent) {
+            s" for commit=${commitOps.get()}"
+          } else {
+            s" with no new commits"
           }))
+          log.warn(s"Current value of latestBatchId: $latestBatchId")
+          log.warn(s"Setting latestBatchId to batchId $batchId")
+          latestBatchId = batchId
           writeClient = Some(client)
           hoodieTableConfig = Some(tableConfig)
+          metaClient = HoodieTableMetaClient.builder()
+            .setConf(sqlContext.sparkContext.hadoopConfiguration)
+            .setBasePath(client.getConfig.getBasePath)
+            .build()
+          // let's update batchId as checkpoint for this commit
+          if (commitOps.isPresent) {
+            val instant = metaClient.getActiveTimeline.getCompletedInstantForTimestamp(commitOps.get())

Review Comment:
   btw, instead of fixing it here, do you think we can do it in L157 (old code) or L212 as per this patch. Or since this is the only place where commit actually got succeeded in hudi and we will have commit metadata to update the batchId, let me know. it makes sense. 
   
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+
+    val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    assert(queryId != null)
+    log.warn(s"Query id: $queryId")
+
+    if (metaClient != null) {
+      val lastCommit = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+        val lastCheckpoint = commitMetadata.getMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)
+        if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+          latestBatchId = lastCheckpoint.toLong
+        }
+      }
+    }
+
+    if (latestBatchId >= batchId) {
+      log.warn(s"Skipping already completed batch $batchId in query $queryId")
+      return
+    }
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
           sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
-      ) match {
+      )
+      match {
         case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
-          log.info(s"Micro batch id=$batchId succeeded"
-            + (commitOps.isPresent match {
-            case true => s" for commit=${commitOps.get()}"
-            case _ => s" with no new commits"
+          log.warn(s"Micro batch id=$batchId succeeded"
+            + (if (commitOps.isPresent) {
+            s" for commit=${commitOps.get()}"
+          } else {
+            s" with no new commits"
           }))
+          log.warn(s"Current value of latestBatchId: $latestBatchId")
+          log.warn(s"Setting latestBatchId to batchId $batchId")
+          latestBatchId = batchId
           writeClient = Some(client)
           hoodieTableConfig = Some(tableConfig)
+          metaClient = HoodieTableMetaClient.builder()
+            .setConf(sqlContext.sparkContext.hadoopConfiguration)
+            .setBasePath(client.getConfig.getBasePath)
+            .build()
+          // let's update batchId as checkpoint for this commit
+          if (commitOps.isPresent) {
+            val instant = metaClient.getActiveTimeline.getCompletedInstantForTimestamp(commitOps.get())

Review Comment:
   I am not very fond of the way we solve this, but guess there is not much we can do. will keep thinking if there is any better way to do this. But for now, will proceed on w/ the review.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -46,14 +48,23 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     with Serializable {
   @volatile private var latestBatchId = -1L
 
+  /*@transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()

Review Comment:
   can we fix this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1183186300

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r926493385


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+
+    val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    assert(queryId != null)
+    log.warn(s"Query id: $queryId")
+
+    if (metaClient != null) {
+      val lastCommit = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+        val lastCheckpoint = commitMetadata.getMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)

Review Comment:
   yes this is not going to work w/ multi-writer. We need a better incremental checkpoint management around this. Filed HUDI-4432 to track.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YuweiXiao commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
YuweiXiao commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r926574083


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala:
##########
@@ -50,6 +51,10 @@ class TestStructuredStreaming extends HoodieClientTestBase {
     DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
     DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
     DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+    DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[ComplexKeyGenerator].getName,
+    DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
+    // HoodieWriteConfig.BULK_INSERT_WRITE_STREAM_ENABLE.key -> "true",

Review Comment:
   I inject a configurable error throw in the own project to verify the rare failover case. Not sure it is suitable way here, as the error injection makes code less readable and ugly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1191409861

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892) 
   * 94000c64ce7d980d93f28d6df44b71268627f6e8 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1231732758

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8a9c5d762c79b41da583561fb827317fb572effd Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265) 
   * 46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r926556389


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -319,6 +319,11 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "lowest and best effort file sizing. "
           + "NONE: No sorting. Fastest and matches `spark.write.parquet()` in terms of number of files, overheads");
 
+  public static final ConfigProperty<String> BULK_INSERT_WRITE_STREAM_ENABLE = ConfigProperty
+          .key("hoodie.bulkinsert.write.stream")
+          .defaultValue("false")
+          .withDocumentation("Enable this config to do bulk insert with `writeStream` dataset using row-writer path, instead of converting to RDD");

Review Comment:
   actually this config is not needed here.. it's required in a separate patch #6099  stacked on top of this one 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1234719141

   can you check the CI failure. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1193108576

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f9c41cc058c4b4b82b26c025b862941577117c96 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240) 
   * bd8b9c43cf0459638381da6d47e03db3d4ff1f8b Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1193107974

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f9c41cc058c4b4b82b26c025b862941577117c96 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240) 
   * bd8b9c43cf0459638381da6d47e03db3d4ff1f8b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1193118762

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bd8b9c43cf0459638381da6d47e03db3d4ff1f8b Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r925287602


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -319,6 +319,11 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "lowest and best effort file sizing. "
           + "NONE: No sorting. Fastest and matches `spark.write.parquet()` in terms of number of files, overheads");
 
+  public static final ConfigProperty<String> BULK_INSERT_WRITE_STREAM_ENABLE = ConfigProperty
+          .key("hoodie.bulkinsert.write.stream")
+          .defaultValue("false")
+          .withDocumentation("Enable this config to do bulk insert with `writeStream` dataset using row-writer path, instead of converting to RDD");

Review Comment:
   Seems an improvement, so is this config option necessary ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1238312251

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051",
       "triggerID" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11085",
       "triggerID" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11121",
       "triggerID" : "1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "60e31e247652316f4b6116ad1ef1a4858d68107e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11189",
       "triggerID" : "60e31e247652316f4b6116ad1ef1a4858d68107e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11121) 
   * 60e31e247652316f4b6116ad1ef1a4858d68107e Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11189) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YuweiXiao commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
YuweiXiao commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r928090269


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -78,26 +82,61 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       log.error("Async clustering service shutdown unexpectedly")
       throw new IllegalStateException("Async clustering service shutdown unexpectedly")
     }
+
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    if (metaClient != null) {

Review Comment:
   Could we extract the checking logic to a function? e.g., checkCanSkipBatch(xxx)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r928182465


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -247,4 +285,18 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       writeClient = Option.empty
     }
   }
+
+  private def canSkipBatch(batchId: Long): Boolean = {
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    val lastCommit = metaClient.get.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+    if (lastCommit.isPresent) {
+      val commitMetadata = HoodieCommitMetadata.fromBytes(
+        metaClient.get.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+      val lastCheckpoint = commitMetadata.getMetadata(SinkCheckpointKey)
+      if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+        latestBatchId = lastCheckpoint.toLong
+      }
+    }
+    latestBatchId >= batchId

Review Comment:
   Yes this is not meant to work for multiple writers with multiple spark checkpoint locations for the same table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r928172917


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -247,4 +285,18 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       writeClient = Option.empty
     }
   }
+
+  private def canSkipBatch(batchId: Long): Boolean = {
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    val lastCommit = metaClient.get.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+    if (lastCommit.isPresent) {
+      val commitMetadata = HoodieCommitMetadata.fromBytes(
+        metaClient.get.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+      val lastCheckpoint = commitMetadata.getMetadata(SinkCheckpointKey)
+      if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+        latestBatchId = lastCheckpoint.toLong
+      }
+    }
+    latestBatchId >= batchId

Review Comment:
   so we deserialize the batch id from storage. how can we guarantee is always monotonically increasing?  users can start a completely new streaming job to the same table, then will the ids become not comparable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r926535477


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala:
##########
@@ -50,6 +51,10 @@ class TestStructuredStreaming extends HoodieClientTestBase {
     DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
     DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
     DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+    DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[ComplexKeyGenerator].getName,
+    DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
+    // HoodieWriteConfig.BULK_INSERT_WRITE_STREAM_ENABLE.key -> "true",

Review Comment:
   this should not be part of the PR. i'll remove it
   For the unit test itself, I tried but it's hard. In an attempt to simulate the scenario, the whole test gets killed. However, I have manually verified the fix. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1191645705

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3e2c3c6b370ed098e5613041a5d92b2f438701de Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r960310722


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -48,12 +50,24 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
 
-  private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
-    DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
-  private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
-    DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
-  private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
-    DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  private val tablePath = new Path(options.getOrElse("path", "Missing 'path' option"))

Review Comment:
   corrected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1234015049

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051",
       "triggerID" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11085",
       "triggerID" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ef925520b421e774bb22f90f89b300ce70ca620a Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11085) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1238468099

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051",
       "triggerID" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11085",
       "triggerID" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11121",
       "triggerID" : "1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "60e31e247652316f4b6116ad1ef1a4858d68107e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11189",
       "triggerID" : "60e31e247652316f4b6116ad1ef1a4858d68107e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 60e31e247652316f4b6116ad1ef1a4858d68107e Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11189) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r926600224


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -78,26 +82,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       log.error("Async clustering service shutdown unexpectedly")
       throw new IllegalStateException("Async clustering service shutdown unexpectedly")
     }
+
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    if (metaClient != null) {
+      val lastCommit = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+        val lastCheckpoint = commitMetadata.getMetadata(SinkCheckpointKey)
+        if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+          latestBatchId = lastCheckpoint.toLong
+        }
+      }
+    }
+    if (latestBatchId >= batchId) {
+      val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+      assert(queryId != null)
+      log.warn(s"Skipping already completed batch $batchId in query $queryId")
+      return
+    }
+
     // Override to use direct markers. In Structured streaming, timeline server is closed after
     // first micro-batch and subsequent micro-batches do not have timeline server running.
     // Thus, we can't use timeline-server-based markers.
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+    // add batchId as checkpoint to the extra metadata
+    updatedOptions = updatedOptions.updated(SinkCheckpointKey, batchId.toString)
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
           sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
-      ) match {
+      )
+      match {
         case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
-          log.info(s"Micro batch id=$batchId succeeded"
-            + (commitOps.isPresent match {
-            case true => s" for commit=${commitOps.get()}"
-            case _ => s" with no new commits"
+          log.warn(s"Micro batch id=$batchId succeeded"

Review Comment:
   ah yes good catch.. thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vinothchandar commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r928369447


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {

Review Comment:
   Row writing is a top priority no? Love to understand this more.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -247,4 +285,18 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       writeClient = Option.empty
     }
   }
+
+  private def canSkipBatch(batchId: Long): Boolean = {
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    val lastCommit = metaClient.get.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+    if (lastCommit.isPresent) {
+      val commitMetadata = HoodieCommitMetadata.fromBytes(
+        metaClient.get.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+      val lastCheckpoint = commitMetadata.getMetadata(SinkCheckpointKey)
+      if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+        latestBatchId = lastCheckpoint.toLong
+      }
+    }
+    latestBatchId >= batchId

Review Comment:
   +1 Might be good to make the data model support multiple values from day 1 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -48,12 +50,24 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
 
-  private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
-    DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
-  private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
-    DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
-  private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
-    DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  private val tablePath = new Path(options.getOrElse("path", "Missing 'path' option"))
+  private var metaClient: Option[HoodieTableMetaClient] = {
+    try {
+      Some(HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration).setBasePath(tablePath.toString).build())
+    } catch {
+      case _: TableNotFoundException =>
+        log.warn("Ignore TableNotFoundException as it is first microbatch.")
+        Option.empty
+    }
+  }
+  private val retryCnt = options.getOrDefault(STREAMING_RETRY_CNT.key,
+    STREAMING_RETRY_CNT.defaultValue).toInt
+  private val retryIntervalMs = options.getOrDefault(STREAMING_RETRY_INTERVAL_MS.key,
+    STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
+  private val ignoreFailedBatch = options.getOrDefault(STREAMING_IGNORE_FAILED_BATCH.key,

Review Comment:
   TBH I think we should make it fail by default and not ignore. Original author from Apple wanted itthat way for them. But probably does not make sense at this point anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YuweiXiao commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
YuweiXiao commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r923306507


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+
+    val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    assert(queryId != null)
+    log.warn(s"Query id: $queryId")
+
+    if (metaClient != null) {
+      val lastCommit = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+        val lastCheckpoint = commitMetadata.getMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)
+        if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+          latestBatchId = lastCheckpoint.toLong
+        }
+      }
+    }
+
+    if (latestBatchId >= batchId) {
+      log.warn(s"Skipping already completed batch $batchId in query $queryId")
+      return
+    }
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
           sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
-      ) match {
+      )
+      match {
         case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
-          log.info(s"Micro batch id=$batchId succeeded"
-            + (commitOps.isPresent match {
-            case true => s" for commit=${commitOps.get()}"
-            case _ => s" with no new commits"
+          log.warn(s"Micro batch id=$batchId succeeded"
+            + (if (commitOps.isPresent) {
+            s" for commit=${commitOps.get()}"
+          } else {
+            s" with no new commits"
           }))
+          log.warn(s"Current value of latestBatchId: $latestBatchId")
+          log.warn(s"Setting latestBatchId to batchId $batchId")
+          latestBatchId = batchId
           writeClient = Some(client)
           hoodieTableConfig = Some(tableConfig)
+          metaClient = HoodieTableMetaClient.builder()
+            .setConf(sqlContext.sparkContext.hadoopConfiguration)
+            .setBasePath(client.getConfig.getBasePath)
+            .build()
+          // let's update batchId as checkpoint for this commit
+          if (commitOps.isPresent) {
+            val instant = metaClient.getActiveTimeline.getCompletedInstantForTimestamp(commitOps.get())

Review Comment:
   I guess we could add the `batchId` to the `extraMetadata` field of the commit metadata. I did the similar thing in my own streaming job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1193048695

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3e2c3c6b370ed098e5613041a5d92b2f438701de Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143) 
   * f9c41cc058c4b4b82b26c025b862941577117c96 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r960310971


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -247,4 +285,18 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       writeClient = Option.empty
     }
   }
+
+  private def canSkipBatch(batchId: Long): Boolean = {
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    val lastCommit = metaClient.get.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+    if (lastCommit.isPresent) {
+      val commitMetadata = HoodieCommitMetadata.fromBytes(
+        metaClient.get.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+      val lastCheckpoint = commitMetadata.getMetadata(SinkCheckpointKey)
+      if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {

Review Comment:
   corrected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r959903545


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -247,4 +295,41 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       writeClient = Option.empty
     }
   }
+
+  private def canSkipBatch(batchId: Long): Boolean = {

Review Comment:
   minor. `batchId` -> `incomingBatchId` or `toBeIngestedBatchId` 
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -247,4 +285,18 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       writeClient = Option.empty
     }
   }
+
+  private def canSkipBatch(batchId: Long): Boolean = {
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    val lastCommit = metaClient.get.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+    if (lastCommit.isPresent) {
+      val commitMetadata = HoodieCommitMetadata.fromBytes(
+        metaClient.get.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+      val lastCheckpoint = commitMetadata.getMetadata(SinkCheckpointKey)
+      if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {

Review Comment:
   I thought we had discussed that this checkpoint fetching logic should be similar to deltastreamer. we have to walk back in commits until we find a (commit metadata) entry w/ valid checkpoint. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -247,4 +295,41 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       writeClient = Option.empty
     }
   }
+
+  private def canSkipBatch(batchId: Long): Boolean = {
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    val lastCommit = metaClient.get.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+    if (lastCommit.isPresent) {
+      val commitMetadata = HoodieCommitMetadata.fromBytes(
+        metaClient.get.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+      val lastCheckpoint = commitMetadata.getMetadata(SINK_CHECKPOINT_KEY)
+      if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+        latestBatchId = HoodieSinkCheckpoint.fromJson(lastCheckpoint).keys.head.toLong

Review Comment:
   similarly, we can rename this to `latestCommittedBatchId`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1232188651

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051",
       "triggerID" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1235754483

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051",
       "triggerID" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11085",
       "triggerID" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11121",
       "triggerID" : "1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11121) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1183167610

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r926557136


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {

Review Comment:
   since we are not landing #6099 so we need this conditional here for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YuweiXiao commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
YuweiXiao commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r926575605


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -78,26 +82,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       log.error("Async clustering service shutdown unexpectedly")
       throw new IllegalStateException("Async clustering service shutdown unexpectedly")
     }
+
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    if (metaClient != null) {
+      val lastCommit = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+        val lastCheckpoint = commitMetadata.getMetadata(SinkCheckpointKey)
+        if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+          latestBatchId = lastCheckpoint.toLong
+        }
+      }
+    }
+    if (latestBatchId >= batchId) {
+      val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+      assert(queryId != null)
+      log.warn(s"Skipping already completed batch $batchId in query $queryId")
+      return
+    }
+
     // Override to use direct markers. In Structured streaming, timeline server is closed after
     // first micro-batch and subsequent micro-batches do not have timeline server running.
     // Thus, we can't use timeline-server-based markers.
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+    // add batchId as checkpoint to the extra metadata
+    updatedOptions = updatedOptions.updated(SinkCheckpointKey, batchId.toString)
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
           sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
-      ) match {
+      )
+      match {
         case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
-          log.info(s"Micro batch id=$batchId succeeded"
-            + (commitOps.isPresent match {
-            case true => s" for commit=${commitOps.get()}"
-            case _ => s" with no new commits"
+          log.warn(s"Micro batch id=$batchId succeeded"

Review Comment:
   log.warn -> log.info?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1191414155

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 94000c64ce7d980d93f28d6df44b71268627f6e8 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140) 
   * 3e2c3c6b370ed098e5613041a5d92b2f438701de UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1193049274

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3e2c3c6b370ed098e5613041a5d92b2f438701de Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143) 
   * f9c41cc058c4b4b82b26c025b862941577117c96 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YuweiXiao commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
YuweiXiao commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1193080716

   > Not sure why CI failed. I grepped the [full logs](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_apis/build/builds/10240/logs/34) and see that tests succeeded (couple of tests skipped but no failures):
   > 
   > ```
   > sagars@Sagars-MacBook-Pro hudi % cat 34 | grep -i "Tests run:"
   > 2022-07-23T04:50:23.5084896Z [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.294 s - in org.apache.hudi.aws.transaction.integ.ITTestDynamoDBBasedLockProvider
   > 2022-07-23T04:50:23.8452776Z [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0
   > 2022-07-23T04:53:54.2007471Z [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 132.301 s - in org.apache.hudi.cli.integ.ITTestRepairsCommand
   > 2022-07-23T04:54:49.7358601Z [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 55.517 s - in org.apache.hudi.cli.integ.ITTestSavepointsCommand
   > 2022-07-23T04:56:02.1501553Z [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 72.377 s - in org.apache.hudi.cli.integ.ITTestBootstrapCommand
   > 2022-07-23T04:56:15.0361575Z [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 12.845 s - in org.apache.hudi.cli.integ.ITTestMarkersCommand
   > 2022-07-23T04:56:15.0581391Z [WARNING] Tests run: 1, Failures: 0, Errors: 0, Skipped: 1, Time elapsed: 0.001 s - in org.apache.hudi.cli.integ.ITTestCommitsCommand
   > 2022-07-23T04:56:15.0660649Z [WARNING] Tests run: 1, Failures: 0, Errors: 0, Skipped: 1, Time elapsed: 0.001 s - in org.apache.hudi.cli.integ.ITTestHDFSParquetImportCommand
   > 2022-07-23T05:00:09.2717006Z [INFO] Tests run: 7, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 234.201 s - in org.apache.hudi.cli.integ.ITTestCompactionCommand
   > 2022-07-23T05:01:59.2645172Z [INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 109.982 s - in org.apache.hudi.cli.integ.ITTestClusteringCommand
   > 2022-07-23T05:01:59.7366818Z [WARNING] Tests run: 22, Failures: 0, Errors: 0, Skipped: 2
   > ```
   
   Maybe retry could help, I see the following error under `IT modules`:
   
   `##[error]We stopped hearing from agent Azure Pipelines 5. Verify the agent machine is running and has a healthy network connection. Anything that terminates an agent process, starves it for CPU, or blocks its network access can cause this error. For more information, see: https:&#x2F;&#x2F;go.microsoft.com&#x2F;fwlink&#x2F;?linkid=846610
   `
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1193078774

   Not sure why CI failed. I grepped the full logs and see that tests succeeded (couple of tests skipped but no failures):
   ```
   sagars@Sagars-MacBook-Pro hudi % cat 34 | grep -i "Tests run:"
   2022-07-23T04:50:23.5084896Z [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.294 s - in org.apache.hudi.aws.transaction.integ.ITTestDynamoDBBasedLockProvider
   2022-07-23T04:50:23.8452776Z [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0
   2022-07-23T04:53:54.2007471Z [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 132.301 s - in org.apache.hudi.cli.integ.ITTestRepairsCommand
   2022-07-23T04:54:49.7358601Z [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 55.517 s - in org.apache.hudi.cli.integ.ITTestSavepointsCommand
   2022-07-23T04:56:02.1501553Z [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 72.377 s - in org.apache.hudi.cli.integ.ITTestBootstrapCommand
   2022-07-23T04:56:15.0361575Z [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 12.845 s - in org.apache.hudi.cli.integ.ITTestMarkersCommand
   2022-07-23T04:56:15.0581391Z [WARNING] Tests run: 1, Failures: 0, Errors: 0, Skipped: 1, Time elapsed: 0.001 s - in org.apache.hudi.cli.integ.ITTestCommitsCommand
   2022-07-23T04:56:15.0660649Z [WARNING] Tests run: 1, Failures: 0, Errors: 0, Skipped: 1, Time elapsed: 0.001 s - in org.apache.hudi.cli.integ.ITTestHDFSParquetImportCommand
   2022-07-23T05:00:09.2717006Z [INFO] Tests run: 7, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 234.201 s - in org.apache.hudi.cli.integ.ITTestCompactionCommand
   2022-07-23T05:01:59.2645172Z [INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 109.982 s - in org.apache.hudi.cli.integ.ITTestClusteringCommand
   2022-07-23T05:01:59.7366818Z [WARNING] Tests run: 22, Failures: 0, Errors: 0, Skipped: 2
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
codope commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r928103368


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -78,26 +82,61 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       log.error("Async clustering service shutdown unexpectedly")
       throw new IllegalStateException("Async clustering service shutdown unexpectedly")
     }
+
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    if (metaClient != null) {

Review Comment:
   That's a very good point! The intention is to check it in every batch. Actually, i did initialize the metaclient in my base commit but i missed it after rebase. Thanks for catching it. Have fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1193173954

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bd8b9c43cf0459638381da6d47e03db3d4ff1f8b Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252) 
   * 8a9c5d762c79b41da583561fb827317fb572effd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r928170690


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -78,26 +92,49 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       log.error("Async clustering service shutdown unexpectedly")
       throw new IllegalStateException("Async clustering service shutdown unexpectedly")
     }
+
+    if (metaClient.isDefined && canSkipBatch(batchId)) {
+      val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+      assert(queryId != null)

Review Comment:
   would prefer validation utils to throw with helpful message



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
     updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {

Review Comment:
   instead of using UPSERT_OPERATION_OPT_VAL directly , shouldn't we import a default value constant?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -48,12 +50,24 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
 
-  private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
-    DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
-  private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
-    DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
-  private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
-    DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  private val tablePath = new Path(options.getOrElse("path", "Missing 'path' option"))

Review Comment:
   not sure why creating a new Path then toString in next line.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -48,12 +50,24 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
 
-  private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
-    DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
-  private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
-    DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
-  private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
-    DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  private val tablePath = new Path(options.getOrElse("path", "Missing 'path' option"))

Review Comment:
   also the default value seems off? do you intend to throw?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -247,4 +285,18 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       writeClient = Option.empty
     }
   }
+
+  private def canSkipBatch(batchId: Long): Boolean = {
+    // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
+    val lastCommit = metaClient.get.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+    if (lastCommit.isPresent) {
+      val commitMetadata = HoodieCommitMetadata.fromBytes(
+        metaClient.get.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), classOf[HoodieCommitMetadata])
+      val lastCheckpoint = commitMetadata.getMetadata(SinkCheckpointKey)
+      if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {

Review Comment:
   /nit StringUtils.nonEmpty()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1235395212

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051",
       "triggerID" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11085",
       "triggerID" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ef925520b421e774bb22f90f89b300ce70ca620a Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11085) 
   * 1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1235399818

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051",
       "triggerID" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11085",
       "triggerID" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11121",
       "triggerID" : "1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ef925520b421e774bb22f90f89b300ce70ca620a Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11085) 
   * 1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11121) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6098: [HUDI-4389] Make HoodieStreamingSink idempotent

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6098:
URL: https://github.com/apache/hudi/pull/6098#issuecomment-1238305387

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9892",
       "triggerID" : "6e1c286b7519cafc44505bfcfb3033b5fc3b5dc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10140",
       "triggerID" : "94000c64ce7d980d93f28d6df44b71268627f6e8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10143",
       "triggerID" : "3e2c3c6b370ed098e5613041a5d92b2f438701de",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10240",
       "triggerID" : "f9c41cc058c4b4b82b26c025b862941577117c96",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10252",
       "triggerID" : "bd8b9c43cf0459638381da6d47e03db3d4ff1f8b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10265",
       "triggerID" : "8a9c5d762c79b41da583561fb827317fb572effd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11051",
       "triggerID" : "46fb6bd1261664d3aaa0fbbd7d33ce968ba8f051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11085",
       "triggerID" : "ef925520b421e774bb22f90f89b300ce70ca620a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11121",
       "triggerID" : "1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "60e31e247652316f4b6116ad1ef1a4858d68107e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "60e31e247652316f4b6116ad1ef1a4858d68107e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1d2a193ac4bf4df359d1f6f6de7a3ec4d427025a Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=11121) 
   * 60e31e247652316f4b6116ad1ef1a4858d68107e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org