You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/31 00:57:06 UTC

[GitHub] [hudi] xushiyan opened a new pull request, #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

xushiyan opened a new pull request, #6256:
URL: https://github.com/apache/hudi/pull/6256

   Follow up from https://github.com/apache/hudi/pull/5436


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r958611385


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
+
+- For Spark
+  - inserts are written to base files: the CDC data `op=I` will be persisted
+  - updates/deletes that written to log files are compacted into base files: the CDC data `op=U|D` will be persisted
+- For Flink
+  - inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
+    persisted
+

Review Comment:
   What do you suggest on when we write out CDC log blocks when we use bucket index? 
   Also why is compaction service not robust?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r951078443


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   OK, but one thing need to be noticed if persist mor's cdc data when compaction. @prasannarajaperumal @xushiyan 
   give an example first: a record(id=1, name=x1) in base file, at t1 commit update  name to x2 (in logFile1), at t2 commit update to x3(in logFile2). CDC should return two changing records (x1->x2, x2->x3).
   The current compaction implement will call `HoodieMergedLogRecordScanner` to get the log records first, then finish the compaction by `HoodieMergeHandler`. But the log records from `HoodieMergedLogRecordScanner` have already combined in advance, so that we lost some cdc info.
   So if we wanna persist cdc when compaction for mor tables, we have to upgrade these related coded: `HoodieMergedLogRecordScanner` to make it return non-combined records, `HoodieCompactor` and `HoodieMergeHandler` to adapt these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r955640553


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,71 +65,74 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                                                                                                                                                                                          |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                                                                                                                                                                                        |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.
 

Review Comment:
   `hoodie.datasource.query.incremental.format=cdc`
   `hoodie.datasource.query.type=incremental`
   
   Can someone explain why we need these two options ? Shouldn't they both be as default and there is no need to configure explicitly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r973652274


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -62,73 +63,79 @@ We follow the debezium output format: four columns as shown below
 - u: represent `update`; when `op` is `u`, both `before` and `after` don't be null;
 - d: represent `delete`; when `op` is `d`, `after` is always null;
 
-Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
+**Note**
 
-## Goals
+* In case of the same record having operations like insert -> delete -> insert, CDC data should be produced to reflect the exact behaviors.
+* The illustration above ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+## Design Goals
 
-## Implementation
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
 
-### CDC Architecture
+## Configurations
 
-![](arch.jpg)
+| key                                                 | default  | description                                                                                                                                                                                                                                                                                                          |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                                                                                                                                                                                        |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.
 
-![](points.jpg)
+| key                                        | default        | description                                                                                                                          |
+|--------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.datasource.query.type               | `snapshot`     | set to `incremental` for incremental query.                                                                                          |
+| hoodie.datasource.query.incremental.format | `latest_state` | `latest_state` (current incremental query behavior) returns the latest records' values. Set to `cdc` to return the full CDC results. |
+| hoodie.datasource.read.begin.instanttime   | -              | requried.                                                                                                                            |
+| hoodie.datasource.read.end.instanttime     | -              | optional.                                                                                                                            |
 
-### Config Definitions
+### Logical File Types
 
-Define a new config:
-
-| key | default | description |
-| --- | --- | --- |
-| hoodie.table.cdc.enabled | false | `true` represents the table to be used for CDC queries and will write cdc data if needed. |
-| hoodie.table.cdc.supplemental.logging | true | If true, persist all the required information about the change data, including 'before' and 'after'. Otherwise, just persist the 'op' and the record key. |
-
-Other existing config that can be reused in cdc mode is as following:
-Define another query mode named `cdc`, which is similar to `snapshpt`, `read_optimized` and `incremental`.
-When read in cdc mode, set `hoodie.datasource.query.type` to `cdc`.
-
-| key | default  | description |
-| --- |---| --- |
-| hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey mode |
-| hoodie.datasource.read.start.timestamp | -        | requried. |
-| hoodie.datasource.read.end.timestamp | -        | optional. |
-
-
-### CDC File Types
-
-Here we define 5 cdc file types in CDC scenario.
+We define 4 logical file types for the CDC scenario.
 
 - CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
-  - when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the change data, including `op`, `ts_ms`, `before` and `after`. When query hudi table in cdc query mode, load this file and return directly.
-  - when `hoodie.table.cdc.supplemental.logging` is false, it just keeps the `op` and the key of the changing record. When query hudi table in cdc query mode, we need to load the previous version and the current one of the touched file slice to extract the other info like `before` and `after` on the fly.
+  - For COW tables, this file type refers to newly written log files alongside base files. The log files in this case only contain CDC info.
+  - For MOR tables, this file type refers to the typical log files in MOR tables. CDC info will be persisted as log blocks in the log files.
 - ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
 - REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
 - MOR_LOG_FILE: a normal log file. For this type, we need to load the previous version of file slice, and merge each record in the log file with this data loaded separately to determine how the record has changed, and get the value of `before` and `after`.
 - REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
 
 Note:
 
-- **Only `CDC_LOG_File` is a new file type and written out by CDC**. The `ADD_BASE_File`, `REMOVE_BASE_FILE`, `MOR_LOG_FILE` and `REPLACED_FILE_GROUP` are just representations of the existing data files in the CDC scenario. For some examples:
-  - `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
-  - `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
+**`CDC_LOG_File` is a new file type and written out for CDC**. `ADD_BASE_File`, `REMOVE_BASE_FILE`, and `REPLACED_FILE_GROUP` represent the existing data files in the CDC scenario. 
 
-### Write
+For examples:
+- `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
+- `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
+
+## When `supplemental.logging.mode=KEY_OP`
+
+In this mode, we minimized the additional storage for CDC information.
 
-The idea is to **Write CDC files as little as possible, and reuse data files as much as possible**.
+- When write, only the change type `op`s and record keys are persisted.
+- When read, changed info will be inferred on-the-fly, which costs more computation power. As `op`s and record keys are
+  available, inference using current and previous committed data will be optimized by reducing IO cost of reading
+  previous committed data, i.e., only read changed records.
 
-Hudi writes data by `HoodieWriteHandle`.
-We notice that only `HoodieMergeHandle` and it's subclasses will receive both the old record and the new-coming record at the same time, merge and write.
-So we will add a `LogFormatWriter` in these classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist of `CDCBlock`.
-The CDC log file will be placed in the same position as the base files and other log files, so that the clean service can clean up them without extra work. The file structure is like:
+The detailed logical flows for write and read scenarios are the same regardless of `logging.mode`, which will be
+illustrated in the section below.
+
+## When `supplemental.logging.mode=DATA_BEFORE` or `DATA_BEFORE_AFTER`
+
+Overall logic flows are illustrated below.
+
+![](logic-flows.jpg)
+
+### Write
+
+Hudi writes data by `HoodieWriteHandle`. We notice that only `HoodieMergeHandle` and its subclasses will receive both
+the old record and the new-coming record at the same time, merge and write. So we will add a `LogFormatWriter` in these
+classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist
+of `CDCBlock`. The CDC log file will be placed in the same position as the base files and other log files, so that the
+clean service can clean up them without extra work. The file structure is like:

Review Comment:
   @alexeykudinkin because cdc log block can be skipped by extracting some indicator bytes of data, we think that it is minimal impact to normal read and so mix it with data log block, to stay consistent with data files' lifecycle like cleaning. Chatted with @YannByron, who will look into different name scheme (e.g. .cdc.log) and how cleaner should be tweaked for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r973653224


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
+
+- For Spark
+  - inserts are written to base files: the CDC data `op=I` will be persisted
+  - updates/deletes that written to log files are compacted into base files: the CDC data `op=U|D` will be persisted
+- For Flink
+  - inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
+    persisted
+

Review Comment:
   discussed offline we're aligned on this approach that flink write won't produce cdc log block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r979371837


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.

Review Comment:
   fixed. moved to a table in "Read" section.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r979374352


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -62,73 +63,79 @@ We follow the debezium output format: four columns as shown below
 - u: represent `update`; when `op` is `u`, both `before` and `after` don't be null;
 - d: represent `delete`; when `op` is `d`, `after` is always null;
 
-Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
+**Note**
 
-## Goals
+* In case of the same record having operations like insert -> delete -> insert, CDC data should be produced to reflect the exact behaviors.
+* The illustration above ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+## Design Goals
 
-## Implementation
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
 
-### CDC Architecture
+## Configurations
 
-![](arch.jpg)
+| key                                                 | default  | description                                                                                                                                                                                                                                                                                                          |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                                                                                                                                                                                        |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.
 
-![](points.jpg)
+| key                                        | default        | description                                                                                                                          |
+|--------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.datasource.query.type               | `snapshot`     | set to `incremental` for incremental query.                                                                                          |
+| hoodie.datasource.query.incremental.format | `latest_state` | `latest_state` (current incremental query behavior) returns the latest records' values. Set to `cdc` to return the full CDC results. |
+| hoodie.datasource.read.begin.instanttime   | -              | requried.                                                                                                                            |
+| hoodie.datasource.read.end.instanttime     | -              | optional.                                                                                                                            |
 
-### Config Definitions
+### Logical File Types
 
-Define a new config:
-
-| key | default | description |
-| --- | --- | --- |
-| hoodie.table.cdc.enabled | false | `true` represents the table to be used for CDC queries and will write cdc data if needed. |
-| hoodie.table.cdc.supplemental.logging | true | If true, persist all the required information about the change data, including 'before' and 'after'. Otherwise, just persist the 'op' and the record key. |
-
-Other existing config that can be reused in cdc mode is as following:
-Define another query mode named `cdc`, which is similar to `snapshpt`, `read_optimized` and `incremental`.
-When read in cdc mode, set `hoodie.datasource.query.type` to `cdc`.
-
-| key | default  | description |
-| --- |---| --- |
-| hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey mode |
-| hoodie.datasource.read.start.timestamp | -        | requried. |
-| hoodie.datasource.read.end.timestamp | -        | optional. |
-
-
-### CDC File Types
-
-Here we define 5 cdc file types in CDC scenario.
+We define 4 logical file types for the CDC scenario.
 
 - CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
-  - when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the change data, including `op`, `ts_ms`, `before` and `after`. When query hudi table in cdc query mode, load this file and return directly.
-  - when `hoodie.table.cdc.supplemental.logging` is false, it just keeps the `op` and the key of the changing record. When query hudi table in cdc query mode, we need to load the previous version and the current one of the touched file slice to extract the other info like `before` and `after` on the fly.
+  - For COW tables, this file type refers to newly written log files alongside base files. The log files in this case only contain CDC info.
+  - For MOR tables, this file type refers to the typical log files in MOR tables. CDC info will be persisted as log blocks in the log files.
 - ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
 - REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
 - MOR_LOG_FILE: a normal log file. For this type, we need to load the previous version of file slice, and merge each record in the log file with this data loaded separately to determine how the record has changed, and get the value of `before` and `after`.
 - REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
 
 Note:
 
-- **Only `CDC_LOG_File` is a new file type and written out by CDC**. The `ADD_BASE_File`, `REMOVE_BASE_FILE`, `MOR_LOG_FILE` and `REPLACED_FILE_GROUP` are just representations of the existing data files in the CDC scenario. For some examples:
-  - `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
-  - `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
+**`CDC_LOG_File` is a new file type and written out for CDC**. `ADD_BASE_File`, `REMOVE_BASE_FILE`, and `REPLACED_FILE_GROUP` represent the existing data files in the CDC scenario. 
 
-### Write
+For examples:
+- `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
+- `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
+
+## When `supplemental.logging.mode=KEY_OP`
+
+In this mode, we minimized the additional storage for CDC information.
 
-The idea is to **Write CDC files as little as possible, and reuse data files as much as possible**.
+- When write, only the change type `op`s and record keys are persisted.
+- When read, changed info will be inferred on-the-fly, which costs more computation power. As `op`s and record keys are
+  available, inference using current and previous committed data will be optimized by reducing IO cost of reading
+  previous committed data, i.e., only read changed records.
 
-Hudi writes data by `HoodieWriteHandle`.
-We notice that only `HoodieMergeHandle` and it's subclasses will receive both the old record and the new-coming record at the same time, merge and write.
-So we will add a `LogFormatWriter` in these classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist of `CDCBlock`.
-The CDC log file will be placed in the same position as the base files and other log files, so that the clean service can clean up them without extra work. The file structure is like:
+The detailed logical flows for write and read scenarios are the same regardless of `logging.mode`, which will be
+illustrated in the section below.
+
+## When `supplemental.logging.mode=DATA_BEFORE` or `DATA_BEFORE_AFTER`
+
+Overall logic flows are illustrated below.
+
+![](logic-flows.jpg)
+
+### Write
+
+Hudi writes data by `HoodieWriteHandle`. We notice that only `HoodieMergeHandle` and its subclasses will receive both
+the old record and the new-coming record at the same time, merge and write. So we will add a `LogFormatWriter` in these
+classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist
+of `CDCBlock`. The CDC log file will be placed in the same position as the base files and other log files, so that the
+clean service can clean up them without extra work. The file structure is like:

Review Comment:
   discussed with @YannByron that we can have a type of log file with `-cdc` suffix. updated this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r964192075


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -42,11 +43,11 @@ In cases where Hudi tables used as streaming sources, we want to be aware of all
 
 To implement this feature, we need to implement the logic on the write and read path to let Hudi figure out the changed data when read. In some cases, we need to write extra data to help optimize CDC queries.
 
-## Scenarios
+## Scenario Illustration

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r964163474


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |

Review Comment:
   > a detailed instruction on they work for different use cases should be explained.
   
   Yes. as shown above, just 1 config `hoodie.table.cdc.supplemental.logging.mode` is to be exposed and the values indicating different level of logging verbosity should be explained in config docs. By default only op and key are to be logged, as the minimized logging mode. I've updated the PR for this accordingly.



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |

Review Comment:
   > a detailed instruction on they work for different use cases should be explained.
   
   @danny0405 Yes. as shown above, just 1 config `hoodie.table.cdc.supplemental.logging.mode` is to be exposed and the values indicating different level of logging verbosity should be explained in config docs. By default only op and key are to be logged, as the minimized logging mode. I've updated the PR for this accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r964213596


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,71 +65,74 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                                                                                                                                                                                          |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                                                                                                                                                                                        |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                        | default        | description                                                                                                                          |
+|--------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.datasource.query.type               | `snapshot`     | set to `incremental` for incremental query.                                                                                          |
+| hoodie.datasource.query.incremental.format | `latest_state` | `latest_state` (current incremental query behavior) returns the latest records' values. Set to `cdc` to return the full CDC results. |
+| hoodie.datasource.read.start.timestamp     | -              | requried.                                                                                                                            |
+| hoodie.datasource.read.end.timestamp       | -              | optional.                                                                                                                            |

Review Comment:
   These are not new configs. We're referring to existing configs here
   
   - https://hudi.apache.org/docs/configurations#hoodiedatasourcereadbegininstanttime
   - https://hudi.apache.org/docs/configurations#hoodiedatasourcereadendinstanttime (docs should be updated to say optional instead of required)
   
   I'll also update the config keys to using "begin.instanttime" and "end.instanttime"
   
   For whether to have default value for begin instant time or not, I'm also inclined to have it required and keep the user behavior unchanged. It might improve the UX if we set a default like latest time, but it's a separate topic from this RFC.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945544647


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   @xushiyan @prasannarajaperumal 
   If we want to make this cdc behavior similar to the characteristic of RO/RT for MOR tables, provide a config like `hoodie.datasource.write.cdc.mor.strategy` that have two enum values: realtime and compaction. When `hoodie.datasource.write.cdc.mor.strategy` = `realtime`, we give the cdc result by merging the log files in-flight asap. if it's `compaction`, generate the cdc log files when compaction. in this `compaction` case, user can get the cdc result until compaction is finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r946486735


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   Some questions need to be cleared here:
   1. when user wants to read the change log of  commits: c1, c2, c3, but the c1 and c2 are cleaned/archived, what is the behavior/semantics of hudi for both cow and mor table ?
   2. when reading in streaming mode, and because of the consumer lagging, some commits requested are cleaned, what is the behavior, show we throws directly ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r934111567


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |

Review Comment:
   i think one config `hoodie.table.cdc.supplemental.logging` maybe is enough. Less configs will improve the ease of use. If not, `hoodie.table.cdc.supplemental.logging.include_before` will be needed to provide.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945241280


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   > This could be confusing for customers who dont know the implementation details.
   
   I actually don't resonate with this, as I don't see this as exposing implementation details. It's a known characteristic that RO queries for MOR tables depends on compaction. This aligns with MOR semantics: if you need to query efficiently (RO), you have to wait for compaction; if you need to get fresher results (RT/snapshot), you have to spend more computation power. Similarly, if you need to query CDC result efficiently, you wait for compaction; if to get fresher CDC results, spend more computation power to merge the log files in-flight and then compute cdc. Logic for the 2nd case has not been proposed (and it should be) in the PR.
   
   I feel having a new service for this would introduce more impl. complexities and gaps to understand. For example, the way it interplays with what type of index is used and whether cdc is on or not. We should note that in COW compaction happens implicitly, which would simplify the impl. for cdc write-on-compaction due to code path reuse.
   
   As this only affects MOR tables, we should trace back to the MOR semantics: what can users expect to get from MOR?  my understanding is: MOR is the tradeoff btw fast ingestion and query freshness, it applies to the data files, it can also be applied to the cdc data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945245688


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.

Review Comment:
   sure. on the other hand, i'm actually not so sure how helpful to introduce these definitions of logical file types. i felt they are understanding overhead. What do you think? i'm more inclined to removing them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r947417129


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   The valid range of cdc query is same to `timetravel`'s, only the instants in active timeline can be queried.
   
   1. Only c3 can be queried. But if c3 needs to load the previous file slice that has been cleaned, throw exceptions. Because part of cdc data from c3 is lost.
   2.  Unless the files that is needed to be loaded instants in current active timeline have already been cleaned, no exception will be thrown. That means some cdc data from the cleaned instants will be lost. The behavior is like that sync the binlog from mysql to kafka. If binlog is archived before they are synced to kafka, they can't be seen in kafka.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r958613614


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket

Review Comment:
   Ref: https://github.com/apache/hudi/pull/6256#discussion_r933909732



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r964188336


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
+
+- For Spark
+  - inserts are written to base files: the CDC data `op=I` will be persisted
+  - updates/deletes that written to log files are compacted into base files: the CDC data `op=U|D` will be persisted
+- For Flink
+  - inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
+    persisted
+
+In summary, we propose CDC data to be persisted synchronously upon base files generation. It is therefore
+write-on-indexing for Spark inserts (non-bucket index) and write-on-compaction for everything else.
+
+Note that it may also be necessary to provide capabilities for asynchronously persisting CDC data, in terms of a
+separate table service like `ChangeTrackingService`, which can be scheduled to fine-tune the CDC-persisting timings.
+This can be used to meet low-latency optimized-read requirements when applicable.

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r966599692


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -215,18 +245,31 @@ Note:
 
 - Only instants that are active can be queried in a CDC scenario.
 - `CDCReader` manages all the things on CDC, and all the spark entrances(DataFrame, SQL, Streaming) call the functions in `CDCReader`.
-- If `hoodie.table.cdc.supplemental.logging` is false, we need to do more work to get the change data. The following illustration explains the difference when this config is true or false.
+- If `hoodie.table.cdc.supplemental.logging.mode=KEY_OP`, we need to compute the changed data. The following illustrates the difference.
 
 ![](read_cdc_log_file.jpg)
 
 #### COW table
 
-Reading COW table in CDC query mode is equivalent to reading a simplified MOR table that has no normal log files.
+Reading COW tables in CDC query mode is equivalent to reading MOR tables in RO mode.
 
 #### MOR table
 
-According to the design of the writing part, only the cases where writing mor tables will write out the base file (which call the `HoodieMergeHandle` and it's subclasses) will write out the cdc files.
-In other words, cdc files will be written out only for the index and file size reasons.
+According to the section "Persisting CDC in MOR", CDC data is available upon base files' generation.
+
+When users want to get fresher real-time CDC results:
+
+- users are to set `hoodie.datasource.query.incremental.type=snapshot`
+- the implementation logic is to compute the results in-flight by reading log files and the corresponding base files (
+  current and previous file slices).
+- this is equivalent to running incremental-query on MOR RT tables
+
+When users want to optimize compute-cost and are tolerant with latency of CDC results,
+
+- users are to set `hoodie.datasource.query.incremental.type=read_optimized`
+- the implementation logic is to extract the results by reading persisted CDC data and the corresponding base files (
+  current and previous file slices).

Review Comment:
   > can you pls clarify what cases exactly represent writer anomalies
   
   I'm a little worried about the cdc semantics correctness before it is widely used in production, such as the deletion, the merging, the data sequence.
   
   > 2nd step to support read_optimized i
   
   We can all it an optimization, but not exposed as read_optimized, which conflicts a little with current read_optimized view, WDYT.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945242880


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |

Review Comment:
   agree that we should have an output format config and keep the query type as is. If they turn on cdc on writer side, they should change the incremental format accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r965380973


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
+
+- For Spark
+  - inserts are written to base files: the CDC data `op=I` will be persisted
+  - updates/deletes that written to log files are compacted into base files: the CDC data `op=U|D` will be persisted
+- For Flink
+  - inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
+    persisted
+

Review Comment:
   @danny0405 looks like we're aligned here: the proposal here is to make use of compaction generate CDC log blocks. For flink MOR write, no index lookup will be or can be performed anyway. For spark MOR write, index lookup happens anyway regardless of enabling CDC (this depends on index type).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r947420470


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |

Review Comment:
   yes. as we discussed above, i think the use case of hudi cdc is simple.
   
   users need to use hudi in cdc scene, just set `hoodie.table.cdc.enabled` when write, and set `hoodie.datasource.query.type=incremental` and `hoodie.datasource.incremental.output.format=cdc` when read. also this table can be queried in the snapshot mode, or other hudi cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r933909927


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.
 
-Define a new config:
+- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
+  - For COW tables, this file type refers to newly written log files alongside base files. The log files in this case only contain CDC info.
+  - For MOR tables, this file type refers to the typical log files in MOR tables. CDC info will be persisted as log blocks in the log files.
+- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
+- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
+- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
 
-| key | default | description |
-| --- | --- | --- |
-| hoodie.table.cdc.enabled | false | `true` represents the table to be used for CDC queries and will write cdc data if needed. |
-| hoodie.table.cdc.supplemental.logging | true | If true, persist all the required information about the change data, including 'before' and 'after'. Otherwise, just persist the 'op' and the record key. |
+Note:
 
-Other existing config that can be reused in cdc mode is as following:
-Define another query mode named `cdc`, which is similar to `snapshpt`, `read_optimized` and `incremental`.
-When read in cdc mode, set `hoodie.datasource.query.type` to `cdc`.
+**`CDC_LOG_File` is a new file type and written out for CDC**. `ADD_BASE_File`, `REMOVE_BASE_FILE`, and `REPLACED_FILE_GROUP` represent the existing data files in the CDC scenario. 
 
-| key | default  | description |
-| --- |---| --- |
-| hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey mode |
-| hoodie.datasource.read.start.timestamp | -        | requried. |
-| hoodie.datasource.read.end.timestamp | -        | optional. |
+For examples:
+- `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
+- `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
 
+## When `supplemental.logging=false`
 
-### CDC File Types
+In this mode, we minimized the additional storage for CDC information. 
 
-Here we define 5 cdc file types in CDC scenario.
+- When write, the logging process is similar to the one described in section "When `supplemental.logging=true`", just that only change type `op` and record key are persisted.
+- When read, changed info will be inferred on-the-fly, which costs more computation power.
 
-- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
-  - when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the change data, including `op`, `ts_ms`, `before` and `after`. When query hudi table in cdc query mode, load this file and return directly.
-  - when `hoodie.table.cdc.supplemental.logging` is false, it just keeps the `op` and the key of the changing record. When query hudi table in cdc query mode, we need to load the previous version and the current one of the touched file slice to extract the other info like `before` and `after` on the fly.
-- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
-- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
-- MOR_LOG_FILE: a normal log file. For this type, we need to load the previous version of file slice, and merge each record in the log file with this data loaded separately to determine how the record has changed, and get the value of `before` and `after`.
-- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
+Detailed inference algorithms are illustrated in this [design document](https://docs.google.com/document/d/1vb6EwTqGE0XBpZxWH6grUP2erCjQYb1dS4K24Dk3vL8/).
 
-Note:
+As additional info like `op` and record keys are persisted, inference using current and previous committed data will be optimized by reducing IO cost of reading previous committed data, i.e., only read changed records.
+
+## When `supplemental.logging=true`

Review Comment:
   see previous discussion summary https://github.com/apache/hudi/pull/5436#issuecomment-1156384006



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   Continued discussion from https://github.com/apache/hudi/pull/5436#issuecomment-1156525801



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r934117346


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.
 
-Define a new config:
+- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
+  - For COW tables, this file type refers to newly written log files alongside base files. The log files in this case only contain CDC info.
+  - For MOR tables, this file type refers to the typical log files in MOR tables. CDC info will be persisted as log blocks in the log files.
+- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
+- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
+- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
 
-| key | default | description |
-| --- | --- | --- |
-| hoodie.table.cdc.enabled | false | `true` represents the table to be used for CDC queries and will write cdc data if needed. |
-| hoodie.table.cdc.supplemental.logging | true | If true, persist all the required information about the change data, including 'before' and 'after'. Otherwise, just persist the 'op' and the record key. |
+Note:
 
-Other existing config that can be reused in cdc mode is as following:
-Define another query mode named `cdc`, which is similar to `snapshpt`, `read_optimized` and `incremental`.
-When read in cdc mode, set `hoodie.datasource.query.type` to `cdc`.
+**`CDC_LOG_File` is a new file type and written out for CDC**. `ADD_BASE_File`, `REMOVE_BASE_FILE`, and `REPLACED_FILE_GROUP` represent the existing data files in the CDC scenario. 
 
-| key | default  | description |
-| --- |---| --- |
-| hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey mode |
-| hoodie.datasource.read.start.timestamp | -        | requried. |
-| hoodie.datasource.read.end.timestamp | -        | optional. |
+For examples:
+- `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
+- `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
 
+## When `supplemental.logging=false`
 
-### CDC File Types
+In this mode, we minimized the additional storage for CDC information. 
 
-Here we define 5 cdc file types in CDC scenario.
+- When write, the logging process is similar to the one described in section "When `supplemental.logging=true`", just that only change type `op` and record key are persisted.
+- When read, changed info will be inferred on-the-fly, which costs more computation power.
 
-- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
-  - when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the change data, including `op`, `ts_ms`, `before` and `after`. When query hudi table in cdc query mode, load this file and return directly.
-  - when `hoodie.table.cdc.supplemental.logging` is false, it just keeps the `op` and the key of the changing record. When query hudi table in cdc query mode, we need to load the previous version and the current one of the touched file slice to extract the other info like `before` and `after` on the fly.
-- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
-- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
-- MOR_LOG_FILE: a normal log file. For this type, we need to load the previous version of file slice, and merge each record in the log file with this data loaded separately to determine how the record has changed, and get the value of `before` and `after`.
-- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
+Detailed inference algorithms are illustrated in this [design document](https://docs.google.com/document/d/1vb6EwTqGE0XBpZxWH6grUP2erCjQYb1dS4K24Dk3vL8/).

Review Comment:
   this google doc and the algorithm to extract cdc when `supplemental.logging=false` is not same.
   both the two algorithms will load the previous version and the current version of a file group to two maps which the key is the record key and the value is the whole record. But in case that we query cdc when `supplemental.logging=false`, just iterator the cdc_log_file and use the record key to get the `before` and `after` values in ad-hoc way. The algorithm in this doc is like a hash-map join to extract cdc.
   Assume that the previous version has M records, the current one has N records, and there are R records updated/deleted/inserted. The algorithm when `supplemental.logging=false` spends O(R), the one in doc spends O(M+N). If i'm wrong, correct me please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r958606223


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,71 +65,74 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                                                                                                                                                                                          |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                                                                                                                                                                                        |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.
 

Review Comment:
   Ref comment - https://github.com/apache/hudi/pull/6256#discussion_r938529820
   query type incremental does not necessarily mean output should be in CDC format since we define incremental query as the latest state in table schema. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r958603552


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -42,11 +43,11 @@ In cases where Hudi tables used as streaming sources, we want to be aware of all
 
 To implement this feature, we need to implement the logic on the write and read path to let Hudi figure out the changed data when read. In some cases, we need to write extra data to help optimize CDC queries.
 
-## Scenarios
+## Scenario Illustration

Review Comment:
   I am just calling out that insert and delete should not skip the row key in the CDC stream. I see a implementation of producing a CDC stream with just from and to version and this does not cover the above requirement. Calling this out explicitly helps. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r938603827


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   This is an interesting trade-off. I understand Compaction is the one common place today where we know for sure the op type for all supported indexing. I am not sure about marrying 2 different concepts here - CDC stream freshness and Query efficiency (compaction frequency). This could be confusing for customers who dont know the implementation details. 
   
   I was thinking along the lines of a new table service, ChangeTrackLoggingService, we can then
   - Asynchronous cdc option in addition
         - no cdc related ingestion failures and no performance hit. 
   - Optional for other indexes, but mandatory for bucket indexing when cdc is on
   - Cant be frequently than compaction - lightweight when supplimental.logging is off. determines the CDC stream freshness.
   
   This also means a different timeline instant to manage. I will think more on this and wait for suggestions as well. 
   
   In general I am not a big fan of supplimental.logging=true - I think duplicating data values is something we should avoid completely. I dont even know how to deal with data inconsistencies between cdc blocks and data blocks if we end up in that state. 
   
   cc @xushiyan @danny0405 @YannByron @vinothchandar 



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |

Review Comment:
   +1 to @YannByron  - yes enabling CDC does not automatically mean incremental queries will always produce CDC format. 
   We need a incremental.output.format to distingush between (latest_state - default, cdc)
   
   ```
   hoodie.datasource.query.type=incremental
   hoodie.datasource.incremental.output.format=cdc
   
   ```



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.
 
-Define a new config:
+- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
+  - For COW tables, this file type refers to newly written log files alongside base files. The log files in this case only contain CDC info.
+  - For MOR tables, this file type refers to the typical log files in MOR tables. CDC info will be persisted as log blocks in the log files.
+- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
+- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
+- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
 
-| key | default | description |
-| --- | --- | --- |
-| hoodie.table.cdc.enabled | false | `true` represents the table to be used for CDC queries and will write cdc data if needed. |
-| hoodie.table.cdc.supplemental.logging | true | If true, persist all the required information about the change data, including 'before' and 'after'. Otherwise, just persist the 'op' and the record key. |
+Note:
 
-Other existing config that can be reused in cdc mode is as following:
-Define another query mode named `cdc`, which is similar to `snapshpt`, `read_optimized` and `incremental`.
-When read in cdc mode, set `hoodie.datasource.query.type` to `cdc`.
+**`CDC_LOG_File` is a new file type and written out for CDC**. `ADD_BASE_File`, `REMOVE_BASE_FILE`, and `REPLACED_FILE_GROUP` represent the existing data files in the CDC scenario. 
 
-| key | default  | description |
-| --- |---| --- |
-| hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey mode |
-| hoodie.datasource.read.start.timestamp | -        | requried. |
-| hoodie.datasource.read.end.timestamp | -        | optional. |
+For examples:
+- `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
+- `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
 
+## When `supplemental.logging=false`
 
-### CDC File Types
+In this mode, we minimized the additional storage for CDC information. 
 
-Here we define 5 cdc file types in CDC scenario.
+- When write, the logging process is similar to the one described in section "When `supplemental.logging=true`", just that only change type `op` and record key are persisted.
+- When read, changed info will be inferred on-the-fly, which costs more computation power.
 
-- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
-  - when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the change data, including `op`, `ts_ms`, `before` and `after`. When query hudi table in cdc query mode, load this file and return directly.
-  - when `hoodie.table.cdc.supplemental.logging` is false, it just keeps the `op` and the key of the changing record. When query hudi table in cdc query mode, we need to load the previous version and the current one of the touched file slice to extract the other info like `before` and `after` on the fly.
-- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
-- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
-- MOR_LOG_FILE: a normal log file. For this type, we need to load the previous version of file slice, and merge each record in the log file with this data loaded separately to determine how the record has changed, and get the value of `before` and `after`.
-- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
+Detailed inference algorithms are illustrated in this [design document](https://docs.google.com/document/d/1vb6EwTqGE0XBpZxWH6grUP2erCjQYb1dS4K24Dk3vL8/).

Review Comment:
   Overall I think there is a difference in the design proposed in HUDI-3478 and in HUDI-1771
   
   HUDI-1771 is a simple elegant design but as mentioned in [comment](https://github.com/apache/hudi/pull/5436#issuecomment-1206165913) - I am more inclined to the RFC-51 design that tracks the op and row_keys in the CDC log block and the reader doing a hash based join with this and the required file slices to compute the CDC stream.
   
   This will be a batched row_keys lookup and we should be efficient in doing that in all engines and the complexity is close to O(updates) and that is very reasonable rather than O(data accessed) like @YannByron mentions here. 
   



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;

Review Comment:
   Looks like the scope is going beyond Spark - lets mention that in the goals. 



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |

Review Comment:
   Combine
   Instead of hoodie.table.cdc.supplemental.logging, hoodie.table.cdc.supplemental.logging.include_after
   Make this a generic extensible config on what to log - 
   hoodie.table.cdc.supplemental.logging.mode?
    
   possible values could be
   
   - min_cdc_metadata (op and key)
   - cdc_data_before
   - cdc_data_before_after
   
   (something to that effect ... )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r979371774


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +155,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.

Review Comment:
   it is meant for persisting CDC data in `-cdc` log files; it does not mean CDC result is not available to users. Users will get the freshest CDC via on-the-fly inference. The problem with flink or bucket index is op (I/U/D) is not available during delta commit hence we can't persist CDC at that point. That's why persisting CDC is deferred to compactor for standardization. I have improve the language below this part to make it clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r980381575


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +155,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.

Review Comment:
   @xushiyan can you elaborate about "on-the-fly inference", i don't think i saw it being mentioned anywhere in the RFC. 
   
   I still don't think i fully understand how we're going to be tackling the issue of CDC records being deferred until compactor runs. What if compactor isn't even setup to run for the table? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r934112569


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |

Review Comment:
   see this: https://github.com/apache/hudi/pull/5885#issuecomment-1189748888 and the related discussion.
   there are some differences between cdc query and inc query. i need to  another config like `hoodie.datasource.incremental.output` to distinguish both. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945554115


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.
 
-Define a new config:
+- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
+  - For COW tables, this file type refers to newly written log files alongside base files. The log files in this case only contain CDC info.
+  - For MOR tables, this file type refers to the typical log files in MOR tables. CDC info will be persisted as log blocks in the log files.
+- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
+- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
+- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
 
-| key | default | description |
-| --- | --- | --- |
-| hoodie.table.cdc.enabled | false | `true` represents the table to be used for CDC queries and will write cdc data if needed. |
-| hoodie.table.cdc.supplemental.logging | true | If true, persist all the required information about the change data, including 'before' and 'after'. Otherwise, just persist the 'op' and the record key. |
+Note:
 
-Other existing config that can be reused in cdc mode is as following:
-Define another query mode named `cdc`, which is similar to `snapshpt`, `read_optimized` and `incremental`.
-When read in cdc mode, set `hoodie.datasource.query.type` to `cdc`.
+**`CDC_LOG_File` is a new file type and written out for CDC**. `ADD_BASE_File`, `REMOVE_BASE_FILE`, and `REPLACED_FILE_GROUP` represent the existing data files in the CDC scenario. 
 
-| key | default  | description |
-| --- |---| --- |
-| hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey mode |
-| hoodie.datasource.read.start.timestamp | -        | requried. |
-| hoodie.datasource.read.end.timestamp | -        | optional. |
+For examples:
+- `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
+- `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
 
+## When `supplemental.logging=false`
 
-### CDC File Types
+In this mode, we minimized the additional storage for CDC information. 
 
-Here we define 5 cdc file types in CDC scenario.
+- When write, the logging process is similar to the one described in section "When `supplemental.logging=true`", just that only change type `op` and record key are persisted.
+- When read, changed info will be inferred on-the-fly, which costs more computation power.
 
-- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
-  - when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the change data, including `op`, `ts_ms`, `before` and `after`. When query hudi table in cdc query mode, load this file and return directly.
-  - when `hoodie.table.cdc.supplemental.logging` is false, it just keeps the `op` and the key of the changing record. When query hudi table in cdc query mode, we need to load the previous version and the current one of the touched file slice to extract the other info like `before` and `after` on the fly.
-- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
-- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
-- MOR_LOG_FILE: a normal log file. For this type, we need to load the previous version of file slice, and merge each record in the log file with this data loaded separately to determine how the record has changed, and get the value of `before` and `after`.
-- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
+Detailed inference algorithms are illustrated in this [design document](https://docs.google.com/document/d/1vb6EwTqGE0XBpZxWH6grUP2erCjQYb1dS4K24Dk3vL8/).

Review Comment:
   maybe `read_cdc_log_file.png` is enough to explain this logic. 
   we can update this png if we have already used `hoodie.table.cdc.supplemental.logging.mode` instead of `hoodie.table.cdc.supplemental.logging`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945438639


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   Efficiency and Data availability are distinct things to trade off on. MoR semantics trades off efficiency but the plan on CDC on compaction tradesoff stream availability (not efficiency?). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on PR #6256:
URL: https://github.com/apache/hudi/pull/6256#issuecomment-1222445640

   @xushiyan - Do you want to take a final stab at the RFC incorporating all the changes? I believe we have consensus on what needs to be done. Please correct me if I am wrong.
   cc @YannByron @danny0405 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r951540267


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   +1 for this. i also have discussed with @xushiyan that a separate jira to track that write cdc when compaction. and provide a config to indicate that extract cdc data from log files in a flight way or cdc files which are written when compaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on PR #6256:
URL: https://github.com/apache/hudi/pull/6256#issuecomment-1223535084

   > @xushiyan - Do you want to take a final stab at the RFC incorporating all the changes? I believe we have consensus on what needs to be done. Please correct me if I am wrong. cc @YannByron @danny0405
   
   @prasannarajaperumal I've updated the RFC as per the recent discussions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r947401482


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |

Review Comment:
   All the existing features are still supported when `hoodie.table.cdc.enabled` is true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945243884


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.
 
-Define a new config:
+- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
+  - For COW tables, this file type refers to newly written log files alongside base files. The log files in this case only contain CDC info.
+  - For MOR tables, this file type refers to the typical log files in MOR tables. CDC info will be persisted as log blocks in the log files.
+- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
+- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
+- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
 
-| key | default | description |
-| --- | --- | --- |
-| hoodie.table.cdc.enabled | false | `true` represents the table to be used for CDC queries and will write cdc data if needed. |
-| hoodie.table.cdc.supplemental.logging | true | If true, persist all the required information about the change data, including 'before' and 'after'. Otherwise, just persist the 'op' and the record key. |
+Note:
 
-Other existing config that can be reused in cdc mode is as following:
-Define another query mode named `cdc`, which is similar to `snapshpt`, `read_optimized` and `incremental`.
-When read in cdc mode, set `hoodie.datasource.query.type` to `cdc`.
+**`CDC_LOG_File` is a new file type and written out for CDC**. `ADD_BASE_File`, `REMOVE_BASE_FILE`, and `REPLACED_FILE_GROUP` represent the existing data files in the CDC scenario. 
 
-| key | default  | description |
-| --- |---| --- |
-| hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey mode |
-| hoodie.datasource.read.start.timestamp | -        | requried. |
-| hoodie.datasource.read.end.timestamp | -        | optional. |
+For examples:
+- `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
+- `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
 
+## When `supplemental.logging=false`
 
-### CDC File Types
+In this mode, we minimized the additional storage for CDC information. 
 
-Here we define 5 cdc file types in CDC scenario.
+- When write, the logging process is similar to the one described in section "When `supplemental.logging=true`", just that only change type `op` and record key are persisted.
+- When read, changed info will be inferred on-the-fly, which costs more computation power.
 
-- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
-  - when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the change data, including `op`, `ts_ms`, `before` and `after`. When query hudi table in cdc query mode, load this file and return directly.
-  - when `hoodie.table.cdc.supplemental.logging` is false, it just keeps the `op` and the key of the changing record. When query hudi table in cdc query mode, we need to load the previous version and the current one of the touched file slice to extract the other info like `before` and `after` on the fly.
-- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
-- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
-- MOR_LOG_FILE: a normal log file. For this type, we need to load the previous version of file slice, and merge each record in the log file with this data loaded separately to determine how the record has changed, and get the value of `before` and `after`.
-- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
+Detailed inference algorithms are illustrated in this [design document](https://docs.google.com/document/d/1vb6EwTqGE0XBpZxWH6grUP2erCjQYb1dS4K24Dk3vL8/).

Review Comment:
   yes the original algorithm in the doc assumes no cdc data (key and op) is logged hence it won't be as optimized as when logging key and op. it should be updated accordingly, at least on this PR to align with the case of logging key and op.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945241280


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   > This could be confusing for customers who dont know the implementation details.
   
   I actually don't resonate with this, as I don't see this as exposing implementation details. It's a known characteristic that RO queries for MOR tables depends on compaction. This aligns with MOR semantics: if you need to query efficiently (RO), you have to wait for compaction; if you need to get fresher results (RT/snapshot), you have to spend more computation power. Similarly, if you need to query CDC result efficiently, you wait for compaction; if to get fresher CDC results, spend more computation power to diff the log files. Logic for the 2nd case has not been proposed (and it should be) in the PR.
   
   I feel having a new service for this would introduce more impl. complexities and gaps to understand. For example, the way it interplays with what type of index is used and whether cdc is on or not. We should note that in COW compaction happens implicitly, which would simplify the impl. for cdc write-on-compaction due to code path reuse.
   
   As this only affects MOR tables, we should trace back to the MOR semantics: what can users expect to get from MOR?  my understanding is: MOR is the tradeoff btw fast ingestion and query freshness, it applies to the data files, it can also be applied to the cdc data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945546131


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |

Review Comment:
   +1 for `hoodie.table.cdc.supplemental.logging.mode` that have these three values for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945577916


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.

Review Comment:
   What is the logical file type here ? To help understand the work flow of writer/reader ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r960391247


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,71 +65,74 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                                                                                                                                                                                          |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                                                                                                                                                                                        |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                        | default        | description                                                                                                                          |
+|--------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.datasource.query.type               | `snapshot`     | set to `incremental` for incremental query.                                                                                          |
+| hoodie.datasource.query.incremental.format | `latest_state` | `latest_state` (current incremental query behavior) returns the latest records' values. Set to `cdc` to return the full CDC results. |
+| hoodie.datasource.read.start.timestamp     | -              | requried.                                                                                                                            |
+| hoodie.datasource.read.end.timestamp       | -              | optional.                                                                                                                            |

Review Comment:
   > Why would start.timestamp have a default value?
   
   A default value as latest commit keeps more align with message queue like Kafka, by default, they consumes from the latest for the consumer.
   
   > end.timestamp default value should be the latest commit.
   
   Agree with this but only for the case that end.timestamp exists which means it is a batch query, remember that we also have streaming query, there is no real end.timestamp in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r960387640


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,71 +65,74 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                                                                                                                                                                                          |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                                                                                                                                                                                        |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.
 

Review Comment:
   Got you, i agree with the point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r951513421


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   Yes - HoodieMergedLogRecordScanner needs to store CDC state on each merged row which could be used in writing the CDC block after compaction. I think we should mention this explicitly in the design - It is okay to track this as a seperate jira and get back to it after the initial implementation. 
   
   ```Compaction will produce a row for every update recorded in the log files```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r949833823


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   Let us agree conceptually in the RFC the CDC stream consistency can be synchronous or asynchronous to the write. Freshness of this stream can be tied to compaction to begin with (potentially having a new background service if required in future). If we ensure that the CDC log writing logic is well abstracted and shared between the upsert and compaction we should be easily extend this to another service in future.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945242513


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |

Review Comment:
   The intention is, when users don't set any of these (everything default), cdc data will be minimized. And the more config turn on, more data will be logged. An enum config "logging.mode" as mentioned above does help simplify it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r960323342


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -215,18 +245,31 @@ Note:
 
 - Only instants that are active can be queried in a CDC scenario.
 - `CDCReader` manages all the things on CDC, and all the spark entrances(DataFrame, SQL, Streaming) call the functions in `CDCReader`.
-- If `hoodie.table.cdc.supplemental.logging` is false, we need to do more work to get the change data. The following illustration explains the difference when this config is true or false.
+- If `hoodie.table.cdc.supplemental.logging.mode=KEY_OP`, we need to compute the changed data. The following illustrates the difference.
 
 ![](read_cdc_log_file.jpg)
 
 #### COW table
 
-Reading COW table in CDC query mode is equivalent to reading a simplified MOR table that has no normal log files.
+Reading COW tables in CDC query mode is equivalent to reading MOR tables in RO mode.
 
 #### MOR table
 
-According to the design of the writing part, only the cases where writing mor tables will write out the base file (which call the `HoodieMergeHandle` and it's subclasses) will write out the cdc files.
-In other words, cdc files will be written out only for the index and file size reasons.
+According to the section "Persisting CDC in MOR", CDC data is available upon base files' generation.
+
+When users want to get fresher real-time CDC results:
+
+- users are to set `hoodie.datasource.query.incremental.type=snapshot`
+- the implementation logic is to compute the results in-flight by reading log files and the corresponding base files (
+  current and previous file slices).
+- this is equivalent to running incremental-query on MOR RT tables
+
+When users want to optimize compute-cost and are tolerant with latency of CDC results,
+
+- users are to set `hoodie.datasource.query.incremental.type=read_optimized`
+- the implementation logic is to extract the results by reading persisted CDC data and the corresponding base files (
+  current and previous file slices).

Review Comment:
   There are indexing choices which result in eventual CDC stream freshness and we could put the impl of constructing CDC on the read to later point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r958608922


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,71 +65,74 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                                                                                                                                                                                          |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                                                                                                                                                                                        |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                        | default        | description                                                                                                                          |
+|--------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.datasource.query.type               | `snapshot`     | set to `incremental` for incremental query.                                                                                          |
+| hoodie.datasource.query.incremental.format | `latest_state` | `latest_state` (current incremental query behavior) returns the latest records' values. Set to `cdc` to return the full CDC results. |
+| hoodie.datasource.read.start.timestamp     | -              | requried.                                                                                                                            |
+| hoodie.datasource.read.end.timestamp       | -              | optional.                                                                                                                            |

Review Comment:
   Why would ```start.timestamp``` have a default value? This is essentially the consumer marker and this is stored outside of Hudi. ```end.timestamp``` default value should be the latest commit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r960403535


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
+
+- For Spark
+  - inserts are written to base files: the CDC data `op=I` will be persisted
+  - updates/deletes that written to log files are compacted into base files: the CDC data `op=U|D` will be persisted
+- For Flink
+  - inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
+    persisted
+

Review Comment:
   > What do you suggest on when we write out CDC log blocks when we use bucket index
   
   Does the index type (bloom filter/bucket) affect the CDC log blocks here ? Personally i think generating the log blocks during write on the fly is not very feasible because looking up the history records index would cost much which reduce the write throughput.
   
   We can generate the cdc on the fly for the mor table reader, and the CDC log blocks generated by compaction is a optimization view like the cow table.
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r965380973


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
+
+- For Spark
+  - inserts are written to base files: the CDC data `op=I` will be persisted
+  - updates/deletes that written to log files are compacted into base files: the CDC data `op=U|D` will be persisted
+- For Flink
+  - inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
+    persisted
+

Review Comment:
   @danny0405 looks like we're aligned here: the proposal here is to make use of compaction generate CDC log blocks. A special case is spark MOR insert: generate CDC `I` log block during write. That's why we mentioned this statement
   
   > The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
   
   As for index lookup impacting write throughput: for flink MOR write, no index lookup will be or can be performed anyway. For spark MOR write, index lookup happens anyway regardless of enabling CDC (this depends on index type).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945544647


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   @xushiyan @prasannarajaperumal 
   If we want to make this cdc behavior similar to the characteristic of RO/RT for MOR tables, provide a config like `hoodie.datasource.write.cdc.mor.strategy` that have two enum values: realtime and compaction. When `hoodie.datasource.write.cdc.mor.strategy` = `realtime`, we give the cdc result by merging the log files in-flight asap. if it's `compaction`, generate the cdc log files when compaction. in this `compaction` case, user can get the cdc result until compaction is finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r960407505


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -215,18 +245,31 @@ Note:
 
 - Only instants that are active can be queried in a CDC scenario.
 - `CDCReader` manages all the things on CDC, and all the spark entrances(DataFrame, SQL, Streaming) call the functions in `CDCReader`.
-- If `hoodie.table.cdc.supplemental.logging` is false, we need to do more work to get the change data. The following illustration explains the difference when this config is true or false.
+- If `hoodie.table.cdc.supplemental.logging.mode=KEY_OP`, we need to compute the changed data. The following illustrates the difference.
 
 ![](read_cdc_log_file.jpg)
 
 #### COW table
 
-Reading COW table in CDC query mode is equivalent to reading a simplified MOR table that has no normal log files.
+Reading COW tables in CDC query mode is equivalent to reading MOR tables in RO mode.
 
 #### MOR table
 
-According to the design of the writing part, only the cases where writing mor tables will write out the base file (which call the `HoodieMergeHandle` and it's subclasses) will write out the cdc files.
-In other words, cdc files will be written out only for the index and file size reasons.
+According to the section "Persisting CDC in MOR", CDC data is available upon base files' generation.
+
+When users want to get fresher real-time CDC results:
+
+- users are to set `hoodie.datasource.query.incremental.type=snapshot`
+- the implementation logic is to compute the results in-flight by reading log files and the corresponding base files (
+  current and previous file slices).
+- this is equivalent to running incremental-query on MOR RT tables
+
+When users want to optimize compute-cost and are tolerant with latency of CDC results,
+
+- users are to set `hoodie.datasource.query.incremental.type=read_optimized`
+- the implementation logic is to extract the results by reading persisted CDC data and the corresponding base files (
+  current and previous file slices).

Review Comment:
   > CDC on the fly is something we dont have to support in the implementation right away I think.
   
   IMO, this is on the first clazz priority because it is robust fallback for the writer anomalies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r970277127


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -62,73 +63,79 @@ We follow the debezium output format: four columns as shown below
 - u: represent `update`; when `op` is `u`, both `before` and `after` don't be null;
 - d: represent `delete`; when `op` is `d`, `after` is always null;
 
-Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
+**Note**
 
-## Goals
+* In case of the same record having operations like insert -> delete -> insert, CDC data should be produced to reflect the exact behaviors.
+* The illustration above ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+## Design Goals
 
-## Implementation
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
 
-### CDC Architecture
+## Configurations
 
-![](arch.jpg)
+| key                                                 | default  | description                                                                                                                                                                                                                                                                                                          |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                                                                                                                                                                                        |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.
 
-![](points.jpg)
+| key                                        | default        | description                                                                                                                          |
+|--------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.datasource.query.type               | `snapshot`     | set to `incremental` for incremental query.                                                                                          |
+| hoodie.datasource.query.incremental.format | `latest_state` | `latest_state` (current incremental query behavior) returns the latest records' values. Set to `cdc` to return the full CDC results. |
+| hoodie.datasource.read.begin.instanttime   | -              | requried.                                                                                                                            |
+| hoodie.datasource.read.end.instanttime     | -              | optional.                                                                                                                            |
 
-### Config Definitions
+### Logical File Types
 
-Define a new config:
-
-| key | default | description |
-| --- | --- | --- |
-| hoodie.table.cdc.enabled | false | `true` represents the table to be used for CDC queries and will write cdc data if needed. |
-| hoodie.table.cdc.supplemental.logging | true | If true, persist all the required information about the change data, including 'before' and 'after'. Otherwise, just persist the 'op' and the record key. |
-
-Other existing config that can be reused in cdc mode is as following:
-Define another query mode named `cdc`, which is similar to `snapshpt`, `read_optimized` and `incremental`.
-When read in cdc mode, set `hoodie.datasource.query.type` to `cdc`.
-
-| key | default  | description |
-| --- |---| --- |
-| hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey mode |
-| hoodie.datasource.read.start.timestamp | -        | requried. |
-| hoodie.datasource.read.end.timestamp | -        | optional. |
-
-
-### CDC File Types
-
-Here we define 5 cdc file types in CDC scenario.
+We define 4 logical file types for the CDC scenario.
 
 - CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
-  - when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the change data, including `op`, `ts_ms`, `before` and `after`. When query hudi table in cdc query mode, load this file and return directly.
-  - when `hoodie.table.cdc.supplemental.logging` is false, it just keeps the `op` and the key of the changing record. When query hudi table in cdc query mode, we need to load the previous version and the current one of the touched file slice to extract the other info like `before` and `after` on the fly.
+  - For COW tables, this file type refers to newly written log files alongside base files. The log files in this case only contain CDC info.
+  - For MOR tables, this file type refers to the typical log files in MOR tables. CDC info will be persisted as log blocks in the log files.
 - ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
 - REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
 - MOR_LOG_FILE: a normal log file. For this type, we need to load the previous version of file slice, and merge each record in the log file with this data loaded separately to determine how the record has changed, and get the value of `before` and `after`.
 - REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
 
 Note:
 
-- **Only `CDC_LOG_File` is a new file type and written out by CDC**. The `ADD_BASE_File`, `REMOVE_BASE_FILE`, `MOR_LOG_FILE` and `REPLACED_FILE_GROUP` are just representations of the existing data files in the CDC scenario. For some examples:
-  - `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
-  - `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
+**`CDC_LOG_File` is a new file type and written out for CDC**. `ADD_BASE_File`, `REMOVE_BASE_FILE`, and `REPLACED_FILE_GROUP` represent the existing data files in the CDC scenario. 
 
-### Write
+For examples:
+- `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
+- `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
+
+## When `supplemental.logging.mode=KEY_OP`
+
+In this mode, we minimized the additional storage for CDC information.
 
-The idea is to **Write CDC files as little as possible, and reuse data files as much as possible**.
+- When write, only the change type `op`s and record keys are persisted.
+- When read, changed info will be inferred on-the-fly, which costs more computation power. As `op`s and record keys are
+  available, inference using current and previous committed data will be optimized by reducing IO cost of reading
+  previous committed data, i.e., only read changed records.
 
-Hudi writes data by `HoodieWriteHandle`.
-We notice that only `HoodieMergeHandle` and it's subclasses will receive both the old record and the new-coming record at the same time, merge and write.
-So we will add a `LogFormatWriter` in these classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist of `CDCBlock`.
-The CDC log file will be placed in the same position as the base files and other log files, so that the clean service can clean up them without extra work. The file structure is like:
+The detailed logical flows for write and read scenarios are the same regardless of `logging.mode`, which will be
+illustrated in the section below.
+
+## When `supplemental.logging.mode=DATA_BEFORE` or `DATA_BEFORE_AFTER`
+
+Overall logic flows are illustrated below.
+
+![](logic-flows.jpg)
+
+### Write
+
+Hudi writes data by `HoodieWriteHandle`. We notice that only `HoodieMergeHandle` and its subclasses will receive both
+the old record and the new-coming record at the same time, merge and write. So we will add a `LogFormatWriter` in these
+classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist
+of `CDCBlock`. The CDC log file will be placed in the same position as the base files and other log files, so that the
+clean service can clean up them without extra work. The file structure is like:

Review Comment:
   AFAIU `CDCBlocks` will be persisted w/in the same log-files as data-blocks for MOR, right? 
   
   I don't think we can do that as this will have serious performance implications for MOR tables that will have to skip over `CDCBlocks` for pure non-CDC queries



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945574953


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |

Review Comment:
   Should we expose such parameters ? They looks confusing at first glance, if we expose them, a detailed instruction on they work for different use cases should be explained.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r946486735


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   Some questions need to be cleared here:
   1. when user wants to read the change log of  commits: c1, c2, c3, but the c1 and c2 are cleaned/archived, what is the behavior/semantics of hudi for both cow and mor table ?
   2. when reading in streaming mode, and because of the consumer lagging, some commits requested are cleaned, what is the behavior, should it throws directly ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r947407085


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |

Review Comment:
   We should make the user story/use cases clear, roughly enable all the options make things a mess and user would feel confused easily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r955643711


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket

Review Comment:
   So what is the solution here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r965393070


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -215,18 +245,31 @@ Note:
 
 - Only instants that are active can be queried in a CDC scenario.
 - `CDCReader` manages all the things on CDC, and all the spark entrances(DataFrame, SQL, Streaming) call the functions in `CDCReader`.
-- If `hoodie.table.cdc.supplemental.logging` is false, we need to do more work to get the change data. The following illustration explains the difference when this config is true or false.
+- If `hoodie.table.cdc.supplemental.logging.mode=KEY_OP`, we need to compute the changed data. The following illustrates the difference.
 
 ![](read_cdc_log_file.jpg)
 
 #### COW table
 
-Reading COW table in CDC query mode is equivalent to reading a simplified MOR table that has no normal log files.
+Reading COW tables in CDC query mode is equivalent to reading MOR tables in RO mode.
 
 #### MOR table
 
-According to the design of the writing part, only the cases where writing mor tables will write out the base file (which call the `HoodieMergeHandle` and it's subclasses) will write out the cdc files.
-In other words, cdc files will be written out only for the index and file size reasons.
+According to the section "Persisting CDC in MOR", CDC data is available upon base files' generation.
+
+When users want to get fresher real-time CDC results:
+
+- users are to set `hoodie.datasource.query.incremental.type=snapshot`
+- the implementation logic is to compute the results in-flight by reading log files and the corresponding base files (
+  current and previous file slices).
+- this is equivalent to running incremental-query on MOR RT tables
+
+When users want to optimize compute-cost and are tolerant with latency of CDC results,
+
+- users are to set `hoodie.datasource.query.incremental.type=read_optimized`
+- the implementation logic is to extract the results by reading persisted CDC data and the corresponding base files (
+  current and previous file slices).

Review Comment:
   > it is robust fallback for the writer anomalies.
   
   can you pls clarify what cases exactly represent writer anomalies? if write failed, data and CDC data should be discarded and commit should abort. In case of data corruption, computing with whatever data persisted there anyway won't be reliable, will it? i think freshness is instead the main reason for doing CDC on the fly.
   
   As for the plan to support read mode, I discussed with @YannByron that computing CDC on the fly (`snapshot` mode) is an easier first implementation step. 2nd step to support `read_optimized` is meant for customers with no strong freshness requirement and want to save compute costs pulling CDC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r965376782


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.

Review Comment:
   These Logical file types are meant to group files/filesgroups logically based on different write scenarios specific for producing CDC data. I had similar concern on understanding these concepts, which are not generic hudi concepts. we should just list out a table here to map Hudi actions -> what files written out -> what CDC data is contained. always easier for people to digest when leverage on existing knowledge (actions here) instead of bringing up new definitions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r981216616


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +155,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.

Review Comment:
   @alexeykudinkin in the last update, i added a table in the Read section "How to infer CDC results", which explains on-the-fly inference. In case of no compaction for the table, CDC queries will always do on-the-fly inference. The Read section "MOR" subsection described about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r939502111


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   > This could be confusing for customers who dont know the implementation details.
   I agree with this. we should try to keep the behavior of hudi cdc consistent with customers's perception, and provide explanations only where inconsistent (for ex. bucket index)
   
   Adding the new table service `ChangeTrackLoggingService`, that is worth thinking about. And customers also need to know this, know when they can deal with the hudi cdc stream.
   
   About supplimental.logging=true, let us provide the two modes to deal with situations where customers are more concerned about query efficiency or write efficiency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r960320320


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -215,18 +245,31 @@ Note:
 
 - Only instants that are active can be queried in a CDC scenario.
 - `CDCReader` manages all the things on CDC, and all the spark entrances(DataFrame, SQL, Streaming) call the functions in `CDCReader`.
-- If `hoodie.table.cdc.supplemental.logging` is false, we need to do more work to get the change data. The following illustration explains the difference when this config is true or false.
+- If `hoodie.table.cdc.supplemental.logging.mode=KEY_OP`, we need to compute the changed data. The following illustrates the difference.
 
 ![](read_cdc_log_file.jpg)
 
 #### COW table
 
-Reading COW table in CDC query mode is equivalent to reading a simplified MOR table that has no normal log files.
+Reading COW tables in CDC query mode is equivalent to reading MOR tables in RO mode.
 
 #### MOR table
 
-According to the design of the writing part, only the cases where writing mor tables will write out the base file (which call the `HoodieMergeHandle` and it's subclasses) will write out the cdc files.
-In other words, cdc files will be written out only for the index and file size reasons.
+According to the section "Persisting CDC in MOR", CDC data is available upon base files' generation.
+
+When users want to get fresher real-time CDC results:
+
+- users are to set `hoodie.datasource.query.incremental.type=snapshot`
+- the implementation logic is to compute the results in-flight by reading log files and the corresponding base files (
+  current and previous file slices).
+- this is equivalent to running incremental-query on MOR RT tables
+
+When users want to optimize compute-cost and are tolerant with latency of CDC results,
+
+- users are to set `hoodie.datasource.query.incremental.type=read_optimized`
+- the implementation logic is to extract the results by reading persisted CDC data and the corresponding base files (
+  current and previous file slices).

Review Comment:
   I agree. We can skip providing read optimized CDC - not easy to explain/understand. We can mention in the design that we can either compute the CDC on the fly or precompute it during the write/compaction. CDC on the fly is something we dont have to support in the implementation right away I think.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r979372604


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   linked in "rollout" section



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r934113335


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.

Review Comment:
   @xushiyan actually 5 file types. do you want to deal with `MOR_LOG_FILE` after we make an agreement about the cdc query on mor tables between flink and spark?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r953468420


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
+
+- For Spark
+  - inserts are written to base files: the CDC data `op=I` will be persisted
+  - updates/deletes that written to log files are compacted into base files: the CDC data `op=U|D` will be persisted
+- For Flink
+  - inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
+    persisted
+
+In summary, we propose CDC data to be persisted synchronously upon base files generation. It is therefore
+write-on-indexing for Spark inserts (non-bucket index) and write-on-compaction for everything else.
+
+Note that it may also be necessary to provide capabilities for asynchronously persisting CDC data, in terms of a
+separate table service like `ChangeTrackingService`, which can be scheduled to fine-tune the CDC-persisting timings.

Review Comment:
   CDC Availability SLA, effectively decoupling it with Compaction frequency



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   @xushiyan @YannByron - Lets link the jira once its created. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r955641659


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,71 +65,74 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                                                                                                                                                                                          |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                                                                                                                                                                                        |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                        | default        | description                                                                                                                          |
+|--------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.datasource.query.type               | `snapshot`     | set to `incremental` for incremental query.                                                                                          |
+| hoodie.datasource.query.incremental.format | `latest_state` | `latest_state` (current incremental query behavior) returns the latest records' values. Set to `cdc` to return the full CDC results. |
+| hoodie.datasource.read.start.timestamp     | -              | requried.                                                                                                                            |
+| hoodie.datasource.read.end.timestamp       | -              | optional.                                                                                                                            |

Review Comment:
   `hoodie.datasource.read.start.timestamp`
   `hoodie.datasource.read.end.timestamp`
   
   Would these two options be needed by the fs view API or only for reader/writer ? The `start.timestamp` should also have a default value and by default, consumes from the latest commit.
   
   We should mark it clearly what is the default value here if it is optional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r966591772


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.

Review Comment:
   > always easier for people to digest when leverage on existing knowledge (actions here) instead of bringing up new definitions
   
   Cann't agree more, let's not introduce new terminology.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r971371531


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -62,73 +63,79 @@ We follow the debezium output format: four columns as shown below
 - u: represent `update`; when `op` is `u`, both `before` and `after` don't be null;
 - d: represent `delete`; when `op` is `d`, `after` is always null;
 
-Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
+**Note**
 
-## Goals
+* In case of the same record having operations like insert -> delete -> insert, CDC data should be produced to reflect the exact behaviors.
+* The illustration above ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+## Design Goals
 
-## Implementation
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines

Review Comment:
   Let's also explicitly call out that:
   
   - For CDC-enabled Tables performance of non-CDC queries should not be affected



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.

Review Comment:
   Agree w/ @xushiyan proposal, let's simplify this -- having a table mapping an action on the Data table into the action on CDC log makes message much more clear.
   



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +155,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.

Review Comment:
   I don't think i understand the proposal here: are we saying that we're not going to be producing the CDC records until compaction is performed? 
   
   I think this is a serious consistency gap: after Delta Commit records are already persisted and are visible to queries, hence someone can actually read the table and see these records persisted but when issuing CDC query they won't see these records.
   
   Can you please elaborate what is the challenge here? What's the issue w/ Flink writer? Why Bucket Index affects this?



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -62,73 +63,79 @@ We follow the debezium output format: four columns as shown below
 - u: represent `update`; when `op` is `u`, both `before` and `after` don't be null;
 - d: represent `delete`; when `op` is `d`, `after` is always null;
 
-Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
+**Note**
 
-## Goals
+* In case of the same record having operations like insert -> delete -> insert, CDC data should be produced to reflect the exact behaviors.
+* The illustration above ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+## Design Goals
 
-## Implementation
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
 
-### CDC Architecture
+## Configurations
 
-![](arch.jpg)
+| key                                                 | default  | description                                                                                                                                                                                                                                                                                                          |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                                                                                                                                                                                        |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.
 
-![](points.jpg)
+| key                                        | default        | description                                                                                                                          |
+|--------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.datasource.query.type               | `snapshot`     | set to `incremental` for incremental query.                                                                                          |
+| hoodie.datasource.query.incremental.format | `latest_state` | `latest_state` (current incremental query behavior) returns the latest records' values. Set to `cdc` to return the full CDC results. |
+| hoodie.datasource.read.begin.instanttime   | -              | requried.                                                                                                                            |
+| hoodie.datasource.read.end.instanttime     | -              | optional.                                                                                                                            |
 
-### Config Definitions
+### Logical File Types
 
-Define a new config:
-
-| key | default | description |
-| --- | --- | --- |
-| hoodie.table.cdc.enabled | false | `true` represents the table to be used for CDC queries and will write cdc data if needed. |
-| hoodie.table.cdc.supplemental.logging | true | If true, persist all the required information about the change data, including 'before' and 'after'. Otherwise, just persist the 'op' and the record key. |
-
-Other existing config that can be reused in cdc mode is as following:
-Define another query mode named `cdc`, which is similar to `snapshpt`, `read_optimized` and `incremental`.
-When read in cdc mode, set `hoodie.datasource.query.type` to `cdc`.
-
-| key | default  | description |
-| --- |---| --- |
-| hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey mode |
-| hoodie.datasource.read.start.timestamp | -        | requried. |
-| hoodie.datasource.read.end.timestamp | -        | optional. |
-
-
-### CDC File Types
-
-Here we define 5 cdc file types in CDC scenario.
+We define 4 logical file types for the CDC scenario.
 
 - CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
-  - when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the change data, including `op`, `ts_ms`, `before` and `after`. When query hudi table in cdc query mode, load this file and return directly.
-  - when `hoodie.table.cdc.supplemental.logging` is false, it just keeps the `op` and the key of the changing record. When query hudi table in cdc query mode, we need to load the previous version and the current one of the touched file slice to extract the other info like `before` and `after` on the fly.
+  - For COW tables, this file type refers to newly written log files alongside base files. The log files in this case only contain CDC info.
+  - For MOR tables, this file type refers to the typical log files in MOR tables. CDC info will be persisted as log blocks in the log files.
 - ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
 - REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
 - MOR_LOG_FILE: a normal log file. For this type, we need to load the previous version of file slice, and merge each record in the log file with this data loaded separately to determine how the record has changed, and get the value of `before` and `after`.
 - REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
 
 Note:
 
-- **Only `CDC_LOG_File` is a new file type and written out by CDC**. The `ADD_BASE_File`, `REMOVE_BASE_FILE`, `MOR_LOG_FILE` and `REPLACED_FILE_GROUP` are just representations of the existing data files in the CDC scenario. For some examples:
-  - `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
-  - `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
+**`CDC_LOG_File` is a new file type and written out for CDC**. `ADD_BASE_File`, `REMOVE_BASE_FILE`, and `REPLACED_FILE_GROUP` represent the existing data files in the CDC scenario. 
 
-### Write
+For examples:
+- `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
+- `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
+
+## When `supplemental.logging.mode=KEY_OP`
+
+In this mode, we minimized the additional storage for CDC information.
 
-The idea is to **Write CDC files as little as possible, and reuse data files as much as possible**.
+- When write, only the change type `op`s and record keys are persisted.
+- When read, changed info will be inferred on-the-fly, which costs more computation power. As `op`s and record keys are
+  available, inference using current and previous committed data will be optimized by reducing IO cost of reading
+  previous committed data, i.e., only read changed records.
 
-Hudi writes data by `HoodieWriteHandle`.
-We notice that only `HoodieMergeHandle` and it's subclasses will receive both the old record and the new-coming record at the same time, merge and write.
-So we will add a `LogFormatWriter` in these classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist of `CDCBlock`.
-The CDC log file will be placed in the same position as the base files and other log files, so that the clean service can clean up them without extra work. The file structure is like:
+The detailed logical flows for write and read scenarios are the same regardless of `logging.mode`, which will be
+illustrated in the section below.
+
+## When `supplemental.logging.mode=DATA_BEFORE` or `DATA_BEFORE_AFTER`
+
+Overall logic flows are illustrated below.
+
+![](logic-flows.jpg)
+
+### Write
+
+Hudi writes data by `HoodieWriteHandle`. We notice that only `HoodieMergeHandle` and its subclasses will receive both
+the old record and the new-coming record at the same time, merge and write. So we will add a `LogFormatWriter` in these
+classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist
+of `CDCBlock`. The CDC log file will be placed in the same position as the base files and other log files, so that the
+clean service can clean up them without extra work. The file structure is like:

Review Comment:
   Understood. I appreciate the intent, but i think we should prioritize querying performance over how much we need to tweak cleaner to support it. 
   
   Problem w/ existing approach is that we can't cleanly separate out CDC/Data log-files w/o having separate naming schemes for them -- even though, we might be creating a new CDC log file for every write we do, whenever we do Delta Commit (on HDFS) we will be appending the log-blocks to the latest log-file present w/o checking whether it's CDC or Data log-file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r964166807


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.
 
-Define a new config:
+- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
+  - For COW tables, this file type refers to newly written log files alongside base files. The log files in this case only contain CDC info.
+  - For MOR tables, this file type refers to the typical log files in MOR tables. CDC info will be persisted as log blocks in the log files.
+- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
+- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
+- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
 
-| key | default | description |
-| --- | --- | --- |
-| hoodie.table.cdc.enabled | false | `true` represents the table to be used for CDC queries and will write cdc data if needed. |
-| hoodie.table.cdc.supplemental.logging | true | If true, persist all the required information about the change data, including 'before' and 'after'. Otherwise, just persist the 'op' and the record key. |
+Note:
 
-Other existing config that can be reused in cdc mode is as following:
-Define another query mode named `cdc`, which is similar to `snapshpt`, `read_optimized` and `incremental`.
-When read in cdc mode, set `hoodie.datasource.query.type` to `cdc`.
+**`CDC_LOG_File` is a new file type and written out for CDC**. `ADD_BASE_File`, `REMOVE_BASE_FILE`, and `REPLACED_FILE_GROUP` represent the existing data files in the CDC scenario. 
 
-| key | default  | description |
-| --- |---| --- |
-| hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey mode |
-| hoodie.datasource.read.start.timestamp | -        | requried. |
-| hoodie.datasource.read.end.timestamp | -        | optional. |
+For examples:
+- `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
+- `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
 
+## When `supplemental.logging=false`
 
-### CDC File Types
+In this mode, we minimized the additional storage for CDC information. 
 
-Here we define 5 cdc file types in CDC scenario.
+- When write, the logging process is similar to the one described in section "When `supplemental.logging=true`", just that only change type `op` and record key are persisted.
+- When read, changed info will be inferred on-the-fly, which costs more computation power.
 
-- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
-  - when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the change data, including `op`, `ts_ms`, `before` and `after`. When query hudi table in cdc query mode, load this file and return directly.
-  - when `hoodie.table.cdc.supplemental.logging` is false, it just keeps the `op` and the key of the changing record. When query hudi table in cdc query mode, we need to load the previous version and the current one of the touched file slice to extract the other info like `before` and `after` on the fly.
-- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
-- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
-- MOR_LOG_FILE: a normal log file. For this type, we need to load the previous version of file slice, and merge each record in the log file with this data loaded separately to determine how the record has changed, and get the value of `before` and `after`.
-- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
+Detailed inference algorithms are illustrated in this [design document](https://docs.google.com/document/d/1vb6EwTqGE0XBpZxWH6grUP2erCjQYb1dS4K24Dk3vL8/).

Review Comment:
   Have updated the PR for this accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r979374427


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -62,73 +63,79 @@ We follow the debezium output format: four columns as shown below
 - u: represent `update`; when `op` is `u`, both `before` and `after` don't be null;
 - d: represent `delete`; when `op` is `d`, `after` is always null;
 
-Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
+**Note**
 
-## Goals
+* In case of the same record having operations like insert -> delete -> insert, CDC data should be produced to reflect the exact behaviors.
+* The illustration above ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+## Design Goals
 
-## Implementation
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan merged pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan merged PR #6256:
URL: https://github.com/apache/hudi/pull/6256


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YannByron commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
YannByron commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945547502


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.

Review Comment:
   I think keeping them can help the developers to understand this design details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945577068


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |

Review Comment:
   Shouldn't the cdc read just an incremental query here with a specific cdc format ? I mean the `hoodie.datasource.query.type` should force to be `incremental` if `hoodie.table.cdc.enabled` is true.
   
   And i have a question: what is the behavior if user opens option `hoodie.table.cdc.enabled` and they want a snapshot query, should we allow that ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r945241280


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,27 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Write-on-indexing vs Write-on-compaction

Review Comment:
   > This could be confusing for customers who dont know the implementation details.
   
   I actually don't resonate with this, as I don't see this as exposing implementation details. It's a known characteristic that RO queries for MOR tables depends on compaction. This aligns with MOR semantics: if you need to query efficiently (RO), you have to wait for compaction; if you need to get fresher results (RT/snapshot), you have to spend more computation power. Similarly, if you need to query CDC result efficiently, you wait for compaction; if to get fresher CDC results, spend more computation power to merge the log files in-flight and then get cdc results. Logic for the 2nd case has not been proposed (and it should be) in the PR.
   
   I feel having a new service for this would introduce more impl. complexities and gaps to understand. For example, the way it interplays with what type of index is used and whether cdc is on or not. We should note that in COW compaction happens implicitly, which would simplify the impl. for cdc write-on-compaction due to code path reuse.
   
   As this only affects MOR tables, we should trace back to the MOR semantics: what can users expect to get from MOR?  my understanding is: MOR is the tradeoff btw fast ingestion and query freshness, it applies to the data files, it can also be applied to the cdc data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r964188068


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
+
+- For Spark
+  - inserts are written to base files: the CDC data `op=I` will be persisted
+  - updates/deletes that written to log files are compacted into base files: the CDC data `op=U|D` will be persisted
+- For Flink
+  - inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
+    persisted
+
+In summary, we propose CDC data to be persisted synchronously upon base files generation. It is therefore
+write-on-indexing for Spark inserts (non-bucket index) and write-on-compaction for everything else.
+
+Note that it may also be necessary to provide capabilities for asynchronously persisting CDC data, in terms of a
+separate table service like `ChangeTrackingService`, which can be scheduled to fine-tune the CDC-persisting timings.

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r966598104


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
+
+- For Spark
+  - inserts are written to base files: the CDC data `op=I` will be persisted
+  - updates/deletes that written to log files are compacted into base files: the CDC data `op=U|D` will be persisted
+- For Flink
+  - inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
+    persisted
+

Review Comment:
   That's true, but by default flink does not take the look up(for better throughput), and we may generate the changelog for reader on the fly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r973655202


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -215,18 +245,31 @@ Note:
 
 - Only instants that are active can be queried in a CDC scenario.
 - `CDCReader` manages all the things on CDC, and all the spark entrances(DataFrame, SQL, Streaming) call the functions in `CDCReader`.
-- If `hoodie.table.cdc.supplemental.logging` is false, we need to do more work to get the change data. The following illustration explains the difference when this config is true or false.
+- If `hoodie.table.cdc.supplemental.logging.mode=KEY_OP`, we need to compute the changed data. The following illustrates the difference.
 
 ![](read_cdc_log_file.jpg)
 
 #### COW table
 
-Reading COW table in CDC query mode is equivalent to reading a simplified MOR table that has no normal log files.
+Reading COW tables in CDC query mode is equivalent to reading MOR tables in RO mode.
 
 #### MOR table
 
-According to the design of the writing part, only the cases where writing mor tables will write out the base file (which call the `HoodieMergeHandle` and it's subclasses) will write out the cdc files.
-In other words, cdc files will be written out only for the index and file size reasons.
+According to the section "Persisting CDC in MOR", CDC data is available upon base files' generation.
+
+When users want to get fresher real-time CDC results:
+
+- users are to set `hoodie.datasource.query.incremental.type=snapshot`
+- the implementation logic is to compute the results in-flight by reading log files and the corresponding base files (
+  current and previous file slices).
+- this is equivalent to running incremental-query on MOR RT tables
+
+When users want to optimize compute-cost and are tolerant with latency of CDC results,
+
+- users are to set `hoodie.datasource.query.incremental.type=read_optimized`
+- the implementation logic is to extract the results by reading persisted CDC data and the corresponding base files (
+  current and previous file slices).

Review Comment:
   chatted with @YannByron and @danny0405 and we aligned on removing the config and allow cdc query to merge on-the-fly result with compacted results, as an optimized approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
prasannarajaperumal commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r953478052


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket

Review Comment:
   in case where row_key lookup is NOT done during writing (e.g. Bucket Index) the `op` data cannot be deduced and hence the CDC log block cannot be written out. 



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
+
+- For Spark
+  - inserts are written to base files: the CDC data `op=I` will be persisted
+  - updates/deletes that written to log files are compacted into base files: the CDC data `op=U|D` will be persisted
+- For Flink
+  - inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
+    persisted
+
+In summary, we propose CDC data to be persisted synchronously upon base files generation. It is therefore
+write-on-indexing for Spark inserts (non-bucket index) and write-on-compaction for everything else.
+
+Note that it may also be necessary to provide capabilities for asynchronously persisting CDC data, in terms of a
+separate table service like `ChangeTrackingService`, which can be scheduled to fine-tune the CDC-persisting timings.
+This can be used to meet low-latency optimized-read requirements when applicable.

Review Comment:
   I would take this line out. Not related in my opinion. 



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -42,11 +43,11 @@ In cases where Hudi tables used as streaming sources, we want to be aware of all
 
 To implement this feature, we need to implement the logic on the write and read path to let Hudi figure out the changed data when read. In some cases, we need to write extra data to help optimize CDC queries.
 
-## Scenarios
+## Scenario Illustration

Review Comment:
   Can we call out / illustrate the scenarios where insert&delete and delete&insert should produce separate CDC rows and that is a requirement for this design



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r955641659


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,71 +65,74 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                                                                                                                                                                                          |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                                                                                                                                                                                        |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                        | default        | description                                                                                                                          |
+|--------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.datasource.query.type               | `snapshot`     | set to `incremental` for incremental query.                                                                                          |
+| hoodie.datasource.query.incremental.format | `latest_state` | `latest_state` (current incremental query behavior) returns the latest records' values. Set to `cdc` to return the full CDC results. |
+| hoodie.datasource.read.start.timestamp     | -              | requried.                                                                                                                            |
+| hoodie.datasource.read.end.timestamp       | -              | optional.                                                                                                                            |

Review Comment:
   `hoodie.datasource.read.start.timestamp`
   `hoodie.datasource.read.end.timestamp`
   
   Would these two options be needed by the fs view API or only for reader/writer ? The `start.timestamp` should also have a default value and by default, consumes from the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r955638291


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -42,11 +43,11 @@ In cases where Hudi tables used as streaming sources, we want to be aware of all
 
 To implement this feature, we need to implement the logic on the write and read path to let Hudi figure out the changed data when read. In some cases, we need to write extra data to help optimize CDC queries.
 
-## Scenarios
+## Scenario Illustration

Review Comment:
   > should produce separate CDC rows
   
   I guess this is a must ? How could you combine them in one row, the schema does not match.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r955642598


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.

Review Comment:
   Can someone give a explanation here ? What is exactly a logical file type ? name it action seems more suitable here?



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown below
 
 Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description                                                                                                                                      |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, persist the required information about the changed data, including `before`. If `false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, persist `after` as well.                                                                                                              |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and `hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description                          |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.                            |
+| hoodie.datasource.read.end.timestamp   | -          | optional.                            |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.

Review Comment:
   Can someone give an explanation here ? What is exactly a logical file type ? name it action seems more suitable here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r955645966


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -215,18 +245,31 @@ Note:
 
 - Only instants that are active can be queried in a CDC scenario.
 - `CDCReader` manages all the things on CDC, and all the spark entrances(DataFrame, SQL, Streaming) call the functions in `CDCReader`.
-- If `hoodie.table.cdc.supplemental.logging` is false, we need to do more work to get the change data. The following illustration explains the difference when this config is true or false.
+- If `hoodie.table.cdc.supplemental.logging.mode=KEY_OP`, we need to compute the changed data. The following illustrates the difference.
 
 ![](read_cdc_log_file.jpg)
 
 #### COW table
 
-Reading COW table in CDC query mode is equivalent to reading a simplified MOR table that has no normal log files.
+Reading COW tables in CDC query mode is equivalent to reading MOR tables in RO mode.
 
 #### MOR table
 
-According to the design of the writing part, only the cases where writing mor tables will write out the base file (which call the `HoodieMergeHandle` and it's subclasses) will write out the cdc files.
-In other words, cdc files will be written out only for the index and file size reasons.
+According to the section "Persisting CDC in MOR", CDC data is available upon base files' generation.
+
+When users want to get fresher real-time CDC results:
+
+- users are to set `hoodie.datasource.query.incremental.type=snapshot`
+- the implementation logic is to compute the results in-flight by reading log files and the corresponding base files (
+  current and previous file slices).
+- this is equivalent to running incremental-query on MOR RT tables
+
+When users want to optimize compute-cost and are tolerant with latency of CDC results,
+
+- users are to set `hoodie.datasource.query.incremental.type=read_optimized`
+- the implementation logic is to extract the results by reading persisted CDC data and the corresponding base files (
+  current and previous file slices).

Review Comment:
   Do we need this two config options then ? 
   `hoodie.datasource.query.incremental.type=snapshot`
   `hoodie.datasource.query.incremental.type=read_optimized`
   
   Very confusing from my side, shouldn't it always be reading the refresh cdc data here ? Why we expose a ro view in cdc streaming read, for what use case ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r955644523


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +152,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
+
+- For Spark
+  - inserts are written to base files: the CDC data `op=I` will be persisted
+  - updates/deletes that written to log files are compacted into base files: the CDC data `op=U|D` will be persisted
+- For Flink
+  - inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
+    persisted
+

Review Comment:
   >inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
       persisted
       
   I don't think the compaction generated cdc logs makes any sense in production, it lost the data freshness for cdc stream and it relies on the compaction service which it self is not a very robust infrastructure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #6256: [RFC-51][HUDI-3478] Update RFC: CDC support

Posted by GitBox <gi...@apache.org>.
xushiyan commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r969293443


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -215,18 +245,31 @@ Note:
 
 - Only instants that are active can be queried in a CDC scenario.
 - `CDCReader` manages all the things on CDC, and all the spark entrances(DataFrame, SQL, Streaming) call the functions in `CDCReader`.
-- If `hoodie.table.cdc.supplemental.logging` is false, we need to do more work to get the change data. The following illustration explains the difference when this config is true or false.
+- If `hoodie.table.cdc.supplemental.logging.mode=KEY_OP`, we need to compute the changed data. The following illustrates the difference.
 
 ![](read_cdc_log_file.jpg)
 
 #### COW table
 
-Reading COW table in CDC query mode is equivalent to reading a simplified MOR table that has no normal log files.
+Reading COW tables in CDC query mode is equivalent to reading MOR tables in RO mode.
 
 #### MOR table
 
-According to the design of the writing part, only the cases where writing mor tables will write out the base file (which call the `HoodieMergeHandle` and it's subclasses) will write out the cdc files.
-In other words, cdc files will be written out only for the index and file size reasons.
+According to the section "Persisting CDC in MOR", CDC data is available upon base files' generation.
+
+When users want to get fresher real-time CDC results:
+
+- users are to set `hoodie.datasource.query.incremental.type=snapshot`
+- the implementation logic is to compute the results in-flight by reading log files and the corresponding base files (
+  current and previous file slices).
+- this is equivalent to running incremental-query on MOR RT tables
+
+When users want to optimize compute-cost and are tolerant with latency of CDC results,
+
+- users are to set `hoodie.datasource.query.incremental.type=read_optimized`
+- the implementation logic is to extract the results by reading persisted CDC data and the corresponding base files (
+  current and previous file slices).

Review Comment:
   @danny0405 i agree on the design perspective, it's ideal that we don't need to let users configure this. Compaction generated CDC data should be stitched with on-the-fly inferred CDC data to optimize perf automatically. @YannByron did mention some implementation challenges. But from RFC standpoint, we should aim for the better design and strive for ways to achieve it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org