You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/06/16 08:07:36 UTC

[GitHub] [hudi] YannByron opened a new pull request, #5885: [RFC-51][HUDI-3478] Hudi CDC

YannByron opened a new pull request, #5885:
URL: https://github.com/apache/hudi/pull/5885

   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contribute/how-to-contribute before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   *(For example: This pull request adds quick-start document.)*
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1192636964

   @prasannarajaperumal @vinothchandar 
   I update this pr, please continue to review. And help to activate the whole CI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1198995760

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "1158585185",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
danny0405 commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1189949717

   > Can you think through the following scenarios and ensure things work as expected.
   > 
   > * Across clustering operations
   > * Across multi writer scenarios.
   > 
   > I am yet to review the MOR relation changes, the write handle changes themselves look good. tbh its cool that such major functionality can be implemented e2e with smaller LOC.
   > 
   > @danny0405 do you want to take a pass at this
   
   Yeah, i need a detail review for this part.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron closed pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
YannByron closed pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi
URL: https://github.com/apache/hudi/pull/5885


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1204972984

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "1158585185",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10577",
       "triggerID" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b41281075d76c23436f8e77380976009e9450cf2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b41281075d76c23436f8e77380976009e9450cf2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 12ed2f325d43a5cfad4d62374611939c48f2f44b Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10577) 
   * b41281075d76c23436f8e77380976009e9450cf2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1189740669

   > Can you think through the following scenarios and ensure things work as expected.
   > 
   > * Across clustering operations
   > * Across multi writer scenarios.
   
   The `TestCDCDataFrameSuite` UT in this pr has covered the scenario across clustering operations. But the multi-writer scenarios is not covered dnd i think maybe it's not necessary. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r923299066


##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -44,6 +44,12 @@ public class HoodieWriteStat implements Serializable {
    */
   private String path;
 
+  /**
+   * Relative cdc file path that store the CDC data.
+   */
+  @Nullable
+  private String cdcPath;

Review Comment:
   maybe `changeTrackingPath` ? After all it is a file path, not a stat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
xushiyan commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1190327207

   > we seem not even make consensus on the design and i don't understand why the RFC doc was merged, confused firstly.
   
   @danny0405 I've explained in https://github.com/apache/hudi/pull/5436#issuecomment-1184104410. Also we've messaged over wechat with @YannByron on this saying we should make a follow up PR to update. This is to accommodate some logistics matter; not meaning to ignore any unresolved questions. We'll make sure previous discussion points linked and resolved in the updating PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vinothchandar commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1189718047

   @YannByron Can we rework this by making CDC a special mode of inc query?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r923292144


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -102,6 +117,15 @@
   protected Map<String, HoodieRecord<T>> keyToNewRecords;
   protected Set<String> writtenRecordKeys;
   protected HoodieFileWriter<IndexedRecord> fileWriter;
+  // a flag that indicate whether allow the change data to write out a cdc log file.
+  protected boolean cdcEnabled = false;

Review Comment:
   hi @prasannarajaperumal 
   i try to create some sub-classes `HoodieChangeTrackingMergeHandle`, `HoodieChangeTrackingSortedMergeHandle` and `HoodieChangeTrackingConcatHandle`, and add the logical to judge whether `HoodieChangeTrackingXXXHandle` should be created at all the places where `HoodieMergeHandle` and other classes are created before. I think it is maybe less clear. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1158585185

   @hudi-bot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1198947565

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1186545393

   Hey @prasannarajaperumal Thank you very much for reviewing this.
   
   CDC is not a new concept, is a common concept for database. So I think it's better to distinguish CDC and `Incremental Query`. Some reasons: 
   - CDC is better known than incremental query. `incremental query` is defined by hudi.
   - Different from `Incremental Query` and `Snapshot Query`, CDC has its own output format in which every record have `op`, `ts_ms`, `before` and `after` fields.
   - According to RFC-51, CDC has its own read and write logical. We have to persist some other information for CDC when data is written to hudi.
   
   Looking forward to your reply.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1208883214

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "1158585185",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10577",
       "triggerID" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b41281075d76c23436f8e77380976009e9450cf2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10582",
       "triggerID" : "b41281075d76c23436f8e77380976009e9450cf2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b41281075d76c23436f8e77380976009e9450cf2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1208862487",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * b41281075d76c23436f8e77380976009e9450cf2 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10582) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1198953622

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "1158585185",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r925110096


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -281,7 +313,18 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
         return false;
       }
     }
-    return writeRecord(hoodieRecord, indexedRecord, isDelete);
+    boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
+    if (cdcEnabled) {
+      if (indexedRecord.isPresent()) {
+        GenericRecord record = (GenericRecord) indexedRecord.get();
+        cdcData.add(cdcRecord(CDCOperationEnum.UPDATE, hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(),

Review Comment:
   ok, i will use `hudi.common.util.collection.ExternalSpillableMap` instead here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1205115532

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "1158585185",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10577",
       "triggerID" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b41281075d76c23436f8e77380976009e9450cf2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10582",
       "triggerID" : "b41281075d76c23436f8e77380976009e9450cf2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b41281075d76c23436f8e77380976009e9450cf2 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10582) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r923039487


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, HoodieDataSourceHelper, HoodieTableSchema, SparkAdapterSupport}
+import org.apache.hudi.HoodieConversionUtils._
+import org.apache.hudi.common.table.cdc.CDCFileTypeEnum._
+import org.apache.hudi.common.table.cdc.CDCUtils._
+import org.apache.hudi.common.table.cdc.CDCOperationEnum._
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieCommitMetadata, HoodieFileFormat, HoodieFileGroupId, HoodieLogFile, HoodieReplaceCommitMetadata, HoodieWriteStat, WriteOperationType}
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.table.timeline.HoodieTimeline._
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.TableSchemaResolver
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
+import org.apache.hudi.internal.schema.InternalSchema
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.{Row, SQLContext, SparkSession}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+class CDCRelation(
+    override val sqlContext: SQLContext,
+    metaClient: HoodieTableMetaClient,
+    cdcSupplementalLogging: Boolean,
+    startInstant: String,
+    endInstant: String,
+    options: Map[String, String]
+) extends BaseRelation with PrunedFilteredScan with Logging {
+
+  val spark: SparkSession = sqlContext.sparkSession
+
+  val fs: FileSystem = metaClient.getFs.getFileSystem
+
+  val basePath: Path = metaClient.getBasePathV2
+
+  val (tableAvroSchema, _) = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val avroSchema = Try(schemaUtil.getTableAvroSchema) match {
+      case Success(schema) => schema
+      case Failure(e) =>
+        throw new IllegalArgumentException("Failed to fetch schema from the table", e)
+    }
+    // try to find internalSchema
+    val internalSchemaFromMeta = try {
+      schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
+    } catch {
+      case _: Exception => InternalSchema.getEmptyInternalSchema
+    }
+    (avroSchema, internalSchemaFromMeta)
+  }
+
+  val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+
+  val commits: Map[HoodieInstant, HoodieCommitMetadata] =
+    CDCRelation.getCompletedCommitInstantInSpecifiedRange(metaClient, startInstant, endInstant)
+
+  /**
+   * Parse the commit metadata between (startInstant, endInstant], and extract the touched partitions
+   * and files to build the filesystem view.
+   */
+  lazy val fsView: HoodieTableFileSystemView = {
+    val touchedPartition = commits.flatMap { case (_, commitMetadata) =>
+      val partitionSet = commitMetadata.getPartitionToWriteStats.keySet()
+      val replacedPartitionSet = commitMetadata match {
+        case replaceCommitMetadata: HoodieReplaceCommitMetadata =>
+          replaceCommitMetadata.getPartitionToReplaceFileIds.keySet().asScala
+        case _ => Set.empty[String]
+      }
+      partitionSet.asScala ++ replacedPartitionSet
+    }.toSet
+    val touchedFiles = touchedPartition.flatMap { partition =>
+      val partitionPath = FSUtils.getPartitionPath(basePath, partition)
+      fs.listStatus(partitionPath)
+    }.toArray
+    new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline.filterCompletedInstants, touchedFiles)
+  }
+
+  /**
+   * At the granularity of a file group, trace the mapping between each commit/instant and changes to this file group.
+   */
+  val changeFilesForPerFileGroupAndCommit: Map[HoodieFileGroupId, HoodieCDCFileGroupSplit] = {
+    val fgToCommitChanges = mutable.Map.empty[HoodieFileGroupId,
+      mutable.Map[HoodieInstant, ChangeFileForSingleFileGroupAndCommit]]
+
+    commits.foreach {
+      case (instant, commitMetadata) =>
+        // parse `partitionToWriteStats` in the metadata of commit
+        commitMetadata.getPartitionToWriteStats.asScala.foreach {
+          case (partition, hoodieWriteStats) =>
+            hoodieWriteStats.asScala.foreach { writeStat =>
+              val fileGroupId = new HoodieFileGroupId(partition, writeStat.getFileId)
+              // Identify the CDC source involved in this commit and
+              // determine its type for subsequent loading using different methods.
+              val changeFile = parseWriteStat(fileGroupId, instant, writeStat,
+                commitMetadata.getOperationType == WriteOperationType.DELETE)
+              if (fgToCommitChanges.contains(fileGroupId)) {
+                fgToCommitChanges(fileGroupId)(instant) = changeFile
+              } else {
+                fgToCommitChanges.put(fileGroupId, mutable.Map(instant -> changeFile))
+              }
+            }
+        }
+
+        // parse `partitionToReplaceFileIds` in the metadata of commit
+        commitMetadata match {
+          case replaceCommitMetadata: HoodieReplaceCommitMetadata =>
+            replaceCommitMetadata.getPartitionToReplaceFileIds.asScala.foreach {
+              case (partition, fileIds) =>
+                fileIds.asScala.foreach { fileId =>
+                  toScalaOption(fsView.fetchLatestFileSlice(partition, fileId)).foreach {
+                    fileSlice =>
+                      val fileGroupId = new HoodieFileGroupId(partition, fileId)
+                      val changeFile =
+                        ChangeFileForSingleFileGroupAndCommit(REPLACED_FILE_GROUP, null, Some(fileSlice))
+                      if (fgToCommitChanges.contains(fileGroupId)) {
+                        fgToCommitChanges(fileGroupId)(instant) = changeFile
+                      } else {
+                        fgToCommitChanges.put(fileGroupId, mutable.Map(instant -> changeFile))
+                      }
+                  }
+                }
+            }
+          case _ =>
+        }
+      case _ =>
+    }
+    fgToCommitChanges.map { case (fgId, instantToChanges) =>
+      (fgId, HoodieCDCFileGroupSplit(instantToChanges.toArray.sortBy(_._1)))
+    }.toMap
+  }
+
+  override final def needConversion: Boolean = false
+
+  override def schema: StructType = CDCRelation.CDC_SPARK_SCHEMA
+
+  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+    val internalRows = buildScan0(requiredColumns, filters)
+    internalRows.asInstanceOf[RDD[Row]]
+  }
+
+  def buildScan0(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = {
+    val nameToField = schema.fields.map(f => f.name -> f).toMap
+    val requiredSchema = StructType(requiredColumns.map(nameToField))
+    val originTableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString)
+    val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
+      sparkSession = spark,
+      dataSchema = tableStructSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = tableStructSchema,
+      filters = Nil,
+      options = options,
+      hadoopConf = spark.sessionState.newHadoopConf()
+    )
+    val cdcRdd = new HoodieCDCRDD(
+      spark,
+      metaClient,
+      cdcSupplementalLogging,
+      parquetReader,
+      originTableSchema,
+      schema,
+      requiredSchema,
+      changeFilesForPerFileGroupAndCommit.values.toArray
+    )
+    cdcRdd.asInstanceOf[RDD[InternalRow]]
+  }
+
+  /**
+   * Parse HoodieWriteStat, judge which type the file is, and what strategy should be used to parse CDC data.
+   * Then build a [[ChangeFileForSingleFileGroupAndCommit]] object.
+   */
+  private def parseWriteStat(

Review Comment:
   yes. let me move this to the common place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1204845643

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "1158585185",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10577",
       "triggerID" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445) 
   * 12ed2f325d43a5cfad4d62374611939c48f2f44b Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10577) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vinothchandar commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1183925847

   @YannByron made one pass to understand the file changes. Will start the detailed review next


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r923035309


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -102,6 +117,15 @@
   protected Map<String, HoodieRecord<T>> keyToNewRecords;
   protected Set<String> writtenRecordKeys;
   protected HoodieFileWriter<IndexedRecord> fileWriter;
+  // a flag that indicate whether allow the change data to write out a cdc log file.
+  protected boolean cdcEnabled = false;

Review Comment:
   I think you mean `HoodieChangeTrackingMergeHandle`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] XuQianJin-Stars commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
XuQianJin-Stars commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1196360570

   @hudi-bot


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1162775106

   update: 
   support cdc when disable `cdc.supplemental.logging`. in this case, cdc block will only persist the `op` and `record_key`  fields.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r925112498


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -229,6 +230,10 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
         HoodieLogBlock logBlock = logFormatReaderWrapper.next();
         final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
         totalLogBlocks.incrementAndGet();
+        if (logBlock.getBlockType() == CDC_DATA_BLOCK) {

Review Comment:
   ok, let me add some UTs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1186787667

   I understand CDC is a database concept. My point was incremental query is also just a form of CDC if you think about how it is used. Yes the schema is different based on the modes of the incremental query.  I believe we can unify the current CDC proposal and the incremental query feature to make it simple for users consuming change streams out of Hudi table. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1186851527

   @prasannarajaperumal 
   Got it. In some aspects, Incremental query can be considered a kind of CDC.  Incremental query just return the data inserted/updated after a certain point. it use the normal format, and only care about the new values rather than the old values. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1160618743

   please review this. The case when `cdc.supplemental.logging` is false and SparkSQL syntax may be supported in the subsequent commits or the other pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1204967298

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "1158585185",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10577",
       "triggerID" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 12ed2f325d43a5cfad4d62374611939c48f2f44b Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10577) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1204978466

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "1158585185",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10577",
       "triggerID" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b41281075d76c23436f8e77380976009e9450cf2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10582",
       "triggerID" : "b41281075d76c23436f8e77380976009e9450cf2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 12ed2f325d43a5cfad4d62374611939c48f2f44b Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10577) 
   * b41281075d76c23436f8e77380976009e9450cf2 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10582) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] melin commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
melin commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1182670254

   @YannByron  cc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r925112104


##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java:
##########
@@ -236,6 +240,40 @@ public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Except
     return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
   }
 
+  /**
+   * parse the bytes of deltacommit, and get the base file and the log files belonging to this
+   * provided file group.
+   */
+  public static Pair<String, List<String>> getFileSliceForFileGroupFromDeltaCommit(

Review Comment:
   no. let me do this first. no simple and low-codes way can do this. i think it deserves a new pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r923045845


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -281,7 +313,18 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
         return false;
       }
     }
-    return writeRecord(hoodieRecord, indexedRecord, isDelete);
+    boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
+    if (cdcEnabled) {
+      if (indexedRecord.isPresent()) {
+        GenericRecord record = (GenericRecord) indexedRecord.get();
+        cdcData.add(cdcRecord(CDCOperationEnum.UPDATE, hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(),

Review Comment:
   IMO, it's ok. 
   A base parquet file is about 128M in most common cases. Even if all the records is updated, the `cdcData` will take the memory that is less that about 300M. And if the workflow is heavy, user can increase the memory of workers.
   But If we are worry about this, use the `hudi.common.util.collection.ExternalSpillableMap` instead of this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1198950658

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1158585185",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
YannByron commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1223521172

   Reopen: https://github.com/apache/hudi/pull/6476


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1208886160

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "1158585185",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10577",
       "triggerID" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b41281075d76c23436f8e77380976009e9450cf2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10582",
       "triggerID" : "b41281075d76c23436f8e77380976009e9450cf2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b41281075d76c23436f8e77380976009e9450cf2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10582",
       "triggerID" : "1208862487",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * b41281075d76c23436f8e77380976009e9450cf2 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10582) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1189748888

   > @YannByron Can we rework this by making CDC a special mode of inc query?
   
   Actually, i think the incremental query is a special mode of CDC. The inc query keeps the normal format and return the less info about CDC.
   But it is fine. If we want to do this, just modify the configs that users need to be aware like this:
   
   the current way sets:
   `hoodie.datasource.query.type=cdc`
   
   the modified way sets:
   ```
   hoodie.datasource.query.type=incremental
   hoodie.datasource.incremental.output=cdc
   ```
   
   is it ok?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] XuQianJin-Stars commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
XuQianJin-Stars commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1196360427

   hudi-bot


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1186159978

   Hey @YannByron ,
   
   Thanks for this PR and a well written [RFC-51](https://github.com/apache/hudi/blob/master/rfc/rfc-51/rfc-51.md).
   Overall I agree with the high level direction. I will do the code review soon. I have a question before that. 
   
   Should we introduce a new concept (CDC) here on Hudi tables? I think this should be sub-mode of Incremental Query. 
   For illustration, Suppose we have something like the following modes for incremental query (change stream)
   - LATEST_STATE_INSERT_DELETE_KEYS (entire row state for all inserted keys and empty delete keys?)
   - LATEST_STATE_ONLY_INSERT_KEYS
   - MIN_STATE_CHANGE_INSERT_DELETE_KEYS (only columns changed and consolidate multiple inserts,deletes, or remove data inserted and deleted within the time range)
   - ALL_STATE_CHANGES_INSERT_DELETE_KEYS (include every single change made to the key)
   
   I think read-schema changes for the CDC style incremental queries could be a challenge. 
   
   The reason I think of converging the incremental queries with RFC-51 is because 
   - Removes the limitation of tracking deletes accross compaction boundaries for incremental queries
   - I think it just makes sense for us to track the data we track when "cdc.supplemental.logging=false" by default for all Hudi tables. Having this data stored efficiently for point lookups will help with record merging as well I suppose?
   
   @YannByron What do you think? (cc @vinothchandar )
   
   Cheers
   Prasanna
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r922989756


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -102,6 +117,15 @@
   protected Map<String, HoodieRecord<T>> keyToNewRecords;
   protected Set<String> writtenRecordKeys;
   protected HoodieFileWriter<IndexedRecord> fileWriter;
+  // a flag that indicate whether allow the change data to write out a cdc log file.
+  protected boolean cdcEnabled = false;

Review Comment:
   Create a sub-class of HoodieAppendHandle - HoodieChangeTrackingAppendHandle and move all the code related to persisting row-level change tracking metadata to the subclass. I prefer naming all methods/parameters as changeTracking instead of CDC. CDC is a feature, ChangeTracking is the action you do during write. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, HoodieDataSourceHelper, HoodieTableSchema, SparkAdapterSupport}
+import org.apache.hudi.HoodieConversionUtils._
+import org.apache.hudi.common.table.cdc.CDCFileTypeEnum._
+import org.apache.hudi.common.table.cdc.CDCUtils._
+import org.apache.hudi.common.table.cdc.CDCOperationEnum._
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieCommitMetadata, HoodieFileFormat, HoodieFileGroupId, HoodieLogFile, HoodieReplaceCommitMetadata, HoodieWriteStat, WriteOperationType}
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.table.timeline.HoodieTimeline._
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.TableSchemaResolver
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
+import org.apache.hudi.internal.schema.InternalSchema
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.{Row, SQLContext, SparkSession}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+class CDCRelation(
+    override val sqlContext: SQLContext,
+    metaClient: HoodieTableMetaClient,
+    cdcSupplementalLogging: Boolean,
+    startInstant: String,
+    endInstant: String,
+    options: Map[String, String]
+) extends BaseRelation with PrunedFilteredScan with Logging {
+
+  val spark: SparkSession = sqlContext.sparkSession
+
+  val fs: FileSystem = metaClient.getFs.getFileSystem
+
+  val basePath: Path = metaClient.getBasePathV2
+
+  val (tableAvroSchema, _) = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val avroSchema = Try(schemaUtil.getTableAvroSchema) match {
+      case Success(schema) => schema
+      case Failure(e) =>
+        throw new IllegalArgumentException("Failed to fetch schema from the table", e)
+    }
+    // try to find internalSchema
+    val internalSchemaFromMeta = try {
+      schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
+    } catch {
+      case _: Exception => InternalSchema.getEmptyInternalSchema
+    }
+    (avroSchema, internalSchemaFromMeta)
+  }
+
+  val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+
+  val commits: Map[HoodieInstant, HoodieCommitMetadata] =
+    CDCRelation.getCompletedCommitInstantInSpecifiedRange(metaClient, startInstant, endInstant)
+
+  /**
+   * Parse the commit metadata between (startInstant, endInstant], and extract the touched partitions
+   * and files to build the filesystem view.
+   */
+  lazy val fsView: HoodieTableFileSystemView = {
+    val touchedPartition = commits.flatMap { case (_, commitMetadata) =>
+      val partitionSet = commitMetadata.getPartitionToWriteStats.keySet()
+      val replacedPartitionSet = commitMetadata match {
+        case replaceCommitMetadata: HoodieReplaceCommitMetadata =>
+          replaceCommitMetadata.getPartitionToReplaceFileIds.keySet().asScala
+        case _ => Set.empty[String]
+      }
+      partitionSet.asScala ++ replacedPartitionSet
+    }.toSet
+    val touchedFiles = touchedPartition.flatMap { partition =>
+      val partitionPath = FSUtils.getPartitionPath(basePath, partition)
+      fs.listStatus(partitionPath)
+    }.toArray
+    new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline.filterCompletedInstants, touchedFiles)
+  }
+
+  /**
+   * At the granularity of a file group, trace the mapping between each commit/instant and changes to this file group.
+   */
+  val changeFilesForPerFileGroupAndCommit: Map[HoodieFileGroupId, HoodieCDCFileGroupSplit] = {
+    val fgToCommitChanges = mutable.Map.empty[HoodieFileGroupId,
+      mutable.Map[HoodieInstant, ChangeFileForSingleFileGroupAndCommit]]
+
+    commits.foreach {
+      case (instant, commitMetadata) =>
+        // parse `partitionToWriteStats` in the metadata of commit
+        commitMetadata.getPartitionToWriteStats.asScala.foreach {
+          case (partition, hoodieWriteStats) =>
+            hoodieWriteStats.asScala.foreach { writeStat =>
+              val fileGroupId = new HoodieFileGroupId(partition, writeStat.getFileId)
+              // Identify the CDC source involved in this commit and
+              // determine its type for subsequent loading using different methods.
+              val changeFile = parseWriteStat(fileGroupId, instant, writeStat,
+                commitMetadata.getOperationType == WriteOperationType.DELETE)
+              if (fgToCommitChanges.contains(fileGroupId)) {
+                fgToCommitChanges(fileGroupId)(instant) = changeFile
+              } else {
+                fgToCommitChanges.put(fileGroupId, mutable.Map(instant -> changeFile))
+              }
+            }
+        }
+
+        // parse `partitionToReplaceFileIds` in the metadata of commit
+        commitMetadata match {
+          case replaceCommitMetadata: HoodieReplaceCommitMetadata =>
+            replaceCommitMetadata.getPartitionToReplaceFileIds.asScala.foreach {
+              case (partition, fileIds) =>
+                fileIds.asScala.foreach { fileId =>
+                  toScalaOption(fsView.fetchLatestFileSlice(partition, fileId)).foreach {
+                    fileSlice =>
+                      val fileGroupId = new HoodieFileGroupId(partition, fileId)
+                      val changeFile =
+                        ChangeFileForSingleFileGroupAndCommit(REPLACED_FILE_GROUP, null, Some(fileSlice))
+                      if (fgToCommitChanges.contains(fileGroupId)) {
+                        fgToCommitChanges(fileGroupId)(instant) = changeFile
+                      } else {
+                        fgToCommitChanges.put(fileGroupId, mutable.Map(instant -> changeFile))
+                      }
+                  }
+                }
+            }
+          case _ =>
+        }
+      case _ =>
+    }
+    fgToCommitChanges.map { case (fgId, instantToChanges) =>
+      (fgId, HoodieCDCFileGroupSplit(instantToChanges.toArray.sortBy(_._1)))
+    }.toMap
+  }
+
+  override final def needConversion: Boolean = false
+
+  override def schema: StructType = CDCRelation.CDC_SPARK_SCHEMA
+
+  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+    val internalRows = buildScan0(requiredColumns, filters)
+    internalRows.asInstanceOf[RDD[Row]]
+  }
+
+  def buildScan0(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = {
+    val nameToField = schema.fields.map(f => f.name -> f).toMap
+    val requiredSchema = StructType(requiredColumns.map(nameToField))
+    val originTableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString)
+    val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
+      sparkSession = spark,
+      dataSchema = tableStructSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = tableStructSchema,
+      filters = Nil,
+      options = options,
+      hadoopConf = spark.sessionState.newHadoopConf()
+    )
+    val cdcRdd = new HoodieCDCRDD(
+      spark,
+      metaClient,
+      cdcSupplementalLogging,
+      parquetReader,
+      originTableSchema,
+      schema,
+      requiredSchema,
+      changeFilesForPerFileGroupAndCommit.values.toArray
+    )
+    cdcRdd.asInstanceOf[RDD[InternalRow]]
+  }
+
+  /**
+   * Parse HoodieWriteStat, judge which type the file is, and what strategy should be used to parse CDC data.
+   * Then build a [[ChangeFileForSingleFileGroupAndCommit]] object.
+   */
+  private def parseWriteStat(

Review Comment:
   Does it make sense to generalize this out of Spark and make the logic to identify the different CDC types and load them common to all clients?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCFileTypeEnum.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+/**
+ * Here define four cdc file types. The different cdc file type will decide which file will be
+ * used to extract the change data, and how to do this.
+ *
+ * CDC_LOG_FILE:
+ *   For this type, there must be a real cdc log file from which we get the whole/part change data.
+ *   when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the
+ *   change data, including `op`, `ts_ms`, `before` and `after`. So read it and return directly,
+ *   no more other files need to be loaded.
+ *   when `hoodie.table.cdc.supplemental.logging` is false, it just keep the `op` and the key of
+ *   the changing record. When `op` is equal to 'i', `before` is null and get the current record
+ *   from the current base/log file as `after`. When `op` is equal to 'u', get the previous
+ *   record from the previous file slice as `before`, and get the current record from the
+ *   current base/log file `after`. When `op` is equal to 'd', get the previous record from
+ *   the previous file slice as `before`, and `after` is null.
+ *
+ * ADD_BASE_FILE:
+ *   For this type, there must be a base file at the current instant. All the records from this
+ *   file is new-coming, so we can load this, mark all the records with `i`, and treat them as
+ *   the value of `after`. The value of `before` for each record is null.
+ *
+ * REMOVE_BASE_FILE:
+ *   For this type, there must be an empty file at the current instant, but a non-empty base file
+ *   at the previous instant. First we find this base file that has the same file group and belongs
+ *   to the previous instant. Then load this, mark all the records with `d`, and treat them as
+ *   the value of `before`. The value of `after` for each record is null.
+ *
+ * MOR_LOG_FILE:
+ *   For this type, a normal log file of mor table will be used. First we need to load the previous
+ *   file slice(including the base file and other log files in the same file group). Then for each
+ *   record from the log file, get the key of this, and execute the following steps:
+ *     1) if the record is deleted,
+ *       a) if there is a record with the same key in the data loaded, `op` is 'd', 'before' is the
+ *          record from the data loaded, `after` is null;
+ *       b) if there is not a record with the same key in the data loaded, just skip.
+ *     2) the record is not deleted,
+ *       a) if there is a record with the same key in the data loaded, `op` is 'u', 'before' is the
+ *          record from the data loaded, `after` is the current record;
+ *       b) if there is not a record with the same key in the data loaded, `op` is 'i', 'before' is
+ *          null, `after` is the current record;
+ *
+ * REPLACED_FILE_GROUP:
+ *   For this type, it must be a replacecommit, like INSERT_OVERWRITE and DROP_PARTITION. It drops
+ *   a whole file group. First we find this file group. Then load this, mark all the records with
+ *   `d`, and treat them as the value of `before`. The value of `after` for each record is null.
+ */
+public enum CDCFileTypeEnum {
+
+  CDC_LOG_FILE,
+  ADD_BASE_File,

Review Comment:
   s/ADD_BASE_File/ADD_BASE_FILE



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -281,7 +313,18 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
         return false;
       }
     }
-    return writeRecord(hoodieRecord, indexedRecord, isDelete);
+    boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
+    if (cdcEnabled) {
+      if (indexedRecord.isPresent()) {
+        GenericRecord record = (GenericRecord) indexedRecord.get();
+        cdcData.add(cdcRecord(CDCOperationEnum.UPDATE, hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(),

Review Comment:
   We will be holding the record data in-memory until the handle is closed when supplemental logging is enabled. Any side-effects to be cautious about?
   We will be deflating the actual record once its written to the file and bloom filter calculation happens after - would there be significant memory pressure if we still hold on to the data for cdc and how do we handle this?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -44,6 +44,12 @@ public class HoodieWriteStat implements Serializable {
    */
   private String path;
 
+  /**
+   * Relative cdc file path that store the CDC data.
+   */
+  @Nullable
+  private String cdcPath;

Review Comment:
   ChangeTrackingStat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vinothchandar commented on pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1189692799

   +1 on doing this as a part of the incremental query. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r925109942


##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -44,6 +44,12 @@ public class HoodieWriteStat implements Serializable {
    */
   private String path;
 
+  /**
+   * Relative cdc file path that store the CDC data.
+   */
+  @Nullable
+  private String cdcPath;

Review Comment:
   let me test this case that use the old hudi version to query tables created by this branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vinothchandar commented on a diff in pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r925085937


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +447,57 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  protected GenericData.Record cdcRecord(CDCOperationEnum operation, String recordKey, String partitionPath,

Review Comment:
   RFC-46 is moving away from GenericRecord as the canonical data record. So we may want to move in that direction as well. We need to sequence the two efforts correctly. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -229,6 +230,10 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
         HoodieLogBlock logBlock = logFormatReaderWrapper.next();
         final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
         totalLogBlocks.incrementAndGet();
+        if (logBlock.getBlockType() == CDC_DATA_BLOCK) {

Review Comment:
   if the data block is rolled back or commit is rolled back, is the CDC block skipped correctly. Can we write some tests to cover these scenarios



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -44,6 +44,12 @@ public class HoodieWriteStat implements Serializable {
    */
   private String path;
 
+  /**
+   * Relative cdc file path that store the CDC data.
+   */
+  @Nullable
+  private String cdcPath;

Review Comment:
   do these new fields evolve well? i.e backwards compatible with existing write stat without these new fields?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java:
##########
@@ -236,6 +240,40 @@ public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Except
     return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
   }
 
+  /**
+   * parse the bytes of deltacommit, and get the base file and the log files belonging to this
+   * provided file group.
+   */
+  public static Pair<String, List<String>> getFileSliceForFileGroupFromDeltaCommit(
+      byte[] bytes, HoodieFileGroupId fileGroupId)
+      throws Exception {
+    String jsonStr = new String(bytes, StandardCharsets.UTF_8);
+    if (jsonStr.isEmpty()) {
+      return null;

Review Comment:
   please avoid using `null` as return type



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -114,6 +118,8 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
                               HoodieTable<T, I, K, O> hoodieTable, Option<Schema> overriddenSchema,
                               TaskContextSupplier taskContextSupplier) {
     super(config, Option.of(instantTime), hoodieTable);
+    this.keyFiled = config.populateMetaFields() ? HoodieRecord.RECORD_KEY_METADATA_FIELD

Review Comment:
   typo: keyField



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java:
##########
@@ -236,6 +240,40 @@ public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Except
     return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
   }
 
+  /**
+   * parse the bytes of deltacommit, and get the base file and the log files belonging to this
+   * provided file group.
+   */
+  public static Pair<String, List<String>> getFileSliceForFileGroupFromDeltaCommit(

Review Comment:
   does nt any of the existing code do this? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -281,7 +313,18 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
         return false;
       }
     }
-    return writeRecord(hoodieRecord, indexedRecord, isDelete);
+    boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
+    if (cdcEnabled) {
+      if (indexedRecord.isPresent()) {
+        GenericRecord record = (GenericRecord) indexedRecord.get();
+        cdcData.add(cdcRecord(CDCOperationEnum.UPDATE, hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(),

Review Comment:
   In general, with all the java/jvm overhead, I think. it'll be more than 300M comfortably. Can we use the spillable map instead here in this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] XuQianJin-Stars closed pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
XuQianJin-Stars closed pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi
URL: https://github.com/apache/hudi/pull/5885


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] XuQianJin-Stars commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
XuQianJin-Stars commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1196363490

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "22d458ad79c201859adf4c444bb92316b6168ad3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10353",
       "triggerID" : "22d458ad79c201859adf4c444bb92316b6168ad3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e175012e7e80e454f7ed0e2b93b003b0f3572e2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10354",
       "triggerID" : "5e175012e7e80e454f7ed0e2b93b003b0f3572e2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e175012e7e80e454f7ed0e2b93b003b0f3572e2 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10354) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1204841423

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445",
       "triggerID" : "1158585185",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "12ed2f325d43a5cfad4d62374611939c48f2f44b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2d407c1c3bae26425b0c8bc6d5d47c7f385dfb0b Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=10445) 
   * 12ed2f325d43a5cfad4d62374611939c48f2f44b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on pull request #5885: [HUDI-3478] Support CDC for Spark in Hudi

Posted by GitBox <gi...@apache.org>.
YannByron commented on PR #5885:
URL: https://github.com/apache/hudi/pull/5885#issuecomment-1208862487

   @hudi-bot run azure 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org