You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/09/25 08:32:39 UTC

[hudi] branch master updated: [RFC-51][HUDI-3478] Update RFC: CDC support (#6256)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fa6990bea1 [RFC-51][HUDI-3478] Update RFC: CDC support (#6256)
fa6990bea1 is described below

commit fa6990bea14894d4b3ad7c8e4cdd067a5db71605
Author: Shiyan Xu <27...@users.noreply.github.com>
AuthorDate: Sun Sep 25 16:32:33 2022 +0800

    [RFC-51][HUDI-3478] Update RFC: CDC support (#6256)
---
 rfc/rfc-51/{points.jpg => code-paths.jpg}          | Bin
 rfc/rfc-51/{arch.jpg => logic-flows.jpg}           | Bin
 rfc/rfc-51/rfc-51.md                               | 217 +++++++++++++--------
 ...io-definition.jpg => scenario-illustration.jpg} | Bin
 4 files changed, 138 insertions(+), 79 deletions(-)

diff --git a/rfc/rfc-51/points.jpg b/rfc/rfc-51/code-paths.jpg
similarity index 100%
rename from rfc/rfc-51/points.jpg
rename to rfc/rfc-51/code-paths.jpg
diff --git a/rfc/rfc-51/arch.jpg b/rfc/rfc-51/logic-flows.jpg
similarity index 100%
rename from rfc/rfc-51/arch.jpg
rename to rfc/rfc-51/logic-flows.jpg
diff --git a/rfc/rfc-51/rfc-51.md b/rfc/rfc-51/rfc-51.md
index ea4f8a6a45..5f3f7ab49b 100644
--- a/rfc/rfc-51/rfc-51.md
+++ b/rfc/rfc-51/rfc-51.md
@@ -26,6 +26,7 @@
 - @Raymond
 - @Vinoth
 - @Danny
+- @Prasanna
 
 # Statue
 JIRA: [https://issues.apache.org/jira/browse/HUDI-3478](https://issues.apache.org/jira/browse/HUDI-3478)
@@ -42,16 +43,16 @@ In cases where Hudi tables used as streaming sources, we want to be aware of all
 
 To implement this feature, we need to implement the logic on the write and read path to let Hudi figure out the changed data when read. In some cases, we need to write extra data to help optimize CDC queries.
 
-## Scenarios
+## Scenario Illustration
 
-Here is a simple case to explain the CDC.
+The diagram below illustrates a typical CDC scenario.
 
-![](scenario-definition.jpg)
+![](scenario-illustration.jpg)
 
 We follow the debezium output format: four columns as shown below
 
 - op: the operation of this record;
-- ts_ms: the timestamp;
+- ts: the timestamp;
 - source: source information such as the name of database and table. **Maybe we don't need this column in Hudi**;
 - before: the previous image before this operation;
 - after: the current image after this operation;
@@ -62,76 +63,64 @@ We follow the debezium output format: four columns as shown below
 - u: represent `update`; when `op` is `u`, both `before` and `after` don't be null;
 - d: represent `delete`; when `op` is `d`, `after` is always null;
 
-Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
+**Note**
 
-## Goals
+* In case of the same record having operations like insert -> delete -> insert, CDC data should be produced to reflect the exact behaviors.
+* The illustration above ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+## Design Goals
 
-## Implementation
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
+5. For CDC-enabled tables, non-CDC queries' performance should not be affected
 
-### CDC Architecture
+## Configurations
 
-![](arch.jpg)
+| key                                                 | default  | description                                                                                                                                                                                                                                                                                                          |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly.                                                                                                                                                                                        |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.
 
-![](points.jpg)
+| key                                        | default        | description                                                                                                                          |
+|--------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.datasource.query.type               | `snapshot`     | set to `incremental` for incremental query.                                                                                          |
+| hoodie.datasource.query.incremental.format | `latest_state` | `latest_state` (current incremental query behavior) returns the latest records' values. Set to `cdc` to return the full CDC results. |
+| hoodie.datasource.read.begin.instanttime   | -              | requried.                                                                                                                            |
+| hoodie.datasource.read.end.instanttime     | -              | optional.                                                                                                                            |
 
-### Config Definitions
+## When `supplemental.logging.mode=KEY_OP`
 
-Define a new config:
+In this mode, we minimized the additional storage for CDC information.
 
-| key | default | description |
-| --- | --- | --- |
-| hoodie.table.cdc.enabled | false | `true` represents the table to be used for CDC queries and will write cdc data if needed. |
-| hoodie.table.cdc.supplemental.logging | true | If true, persist all the required information about the change data, including 'before' and 'after'. Otherwise, just persist the 'op' and the record key. |
+- When write, only the change type `op`s and record keys are persisted.
+- When read, changed info will be inferred on-the-fly, which costs more computation power. As `op`s and record keys are
+  available, inference using current and previous committed data will be optimized by reducing IO cost of reading
+  previous committed data, i.e., only read changed records.
 
-Other existing config that can be reused in cdc mode is as following:
-Define another query mode named `cdc`, which is similar to `snapshpt`, `read_optimized` and `incremental`.
-When read in cdc mode, set `hoodie.datasource.query.type` to `cdc`.
+The detailed logical flows for write and read scenarios are the same regardless of `logging.mode`, which will be
+illustrated in the section below.
 
-| key | default  | description |
-| --- |---| --- |
-| hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey mode |
-| hoodie.datasource.read.start.timestamp | -        | requried. |
-| hoodie.datasource.read.end.timestamp | -        | optional. |
+## When `supplemental.logging.mode=DATA_BEFORE` or `DATA_BEFORE_AFTER`
 
+Overall logic flows are illustrated below.
 
-### CDC File Types
-
-Here we define 5 cdc file types in CDC scenario.
-
-- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
-  - when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the change data, including `op`, `ts_ms`, `before` and `after`. When query hudi table in cdc query mode, load this file and return directly.
-  - when `hoodie.table.cdc.supplemental.logging` is false, it just keeps the `op` and the key of the changing record. When query hudi table in cdc query mode, we need to load the previous version and the current one of the touched file slice to extract the other info like `before` and `after` on the fly.
-- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
-- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
-- MOR_LOG_FILE: a normal log file. For this type, we need to load the previous version of file slice, and merge each record in the log file with this data loaded separately to determine how the record has changed, and get the value of `before` and `after`.
-- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.
-
-Note:
-
-- **Only `CDC_LOG_File` is a new file type and written out by CDC**. The `ADD_BASE_File`, `REMOVE_BASE_FILE`, `MOR_LOG_FILE` and `REPLACED_FILE_GROUP` are just representations of the existing data files in the CDC scenario. For some examples:
-  - `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
-  - `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
+![](logic-flows.jpg)
 
 ### Write
 
-The idea is to **Write CDC files as little as possible, and reuse data files as much as possible**.
-
-Hudi writes data by `HoodieWriteHandle`.
-We notice that only `HoodieMergeHandle` and it's subclasses will receive both the old record and the new-coming record at the same time, merge and write.
-So we will add a `LogFormatWriter` in these classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist of `CDCBlock`.
-The CDC log file will be placed in the same position as the base files and other log files, so that the clean service can clean up them without extra work. The file structure is like:
+Hudi writes data by `HoodieWriteHandle`. We notice that only `HoodieMergeHandle` and its subclasses will receive both
+the old record and the new-coming record at the same time, merge and write. So we will add a `LogFormatWriter` in these
+classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist
+of `CDCBlock`. The CDC log files have `-cdc` suffix (to distinguish from data log files for read performance
+consideration) will be placed in the same paths as the base files and other log files. Clean service needs to be tweaked
+accordingly. An example of the file structure:
 
 ```
-hudi_cdc_table/
+cow_table/
     .hoodie/
         hoodie.properties
         00001.commit
@@ -139,29 +128,57 @@ hudi_cdc_table/
         ...
     year=2021/
         filegroup1-instant1.parquet
-        .filegroup1-instant1.log
+        .filegroup1-instant1-cdc
     year=2022/
         filegroup2-instant1.parquet
-        .filegroup2-instant1.log
+        .filegroup2-instant1-cdc
     ...
 ```
 
-Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.
+Under partition directories, the `-cdc` files with `CDCBlock` as shown above contain the persisted changed data.
+
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
+
+The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
+
+- For Spark
+  - inserts are written to base files: the CDC data `op=I` will be persisted
+  - updates/deletes that written to log files are compacted into base files: the CDC data `op=U|D` will be persisted
+- For Flink
+  - inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
+    persisted
+
+In summary, we propose that CDC data should be persisted synchronously upon base files generation. It is therefore
+write-on-indexing for Spark inserts (non-bucket index) and write-on-compaction for everything else.
+
+- Note 1: CDC results can still be returned upon CDC-type query by doing on-the-fly inference, before compaction is
+  performed. Details are illustrated in the [Read](#cdcread) section below.
+- Note 2: it may also be necessary to provide capabilities for asynchronously persisting CDC data, in terms of a
+  separate table service like `ChangeTrackingService`, which can be scheduled to fine-tune the CDC Availability SLA,
+  effectively decoupling it with Compaction frequency.
 
-There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
+#### Examples
 
-Spark DataSource example:
+Spark DataSource:
 
 ```scala
 df.write.format("hudi").
   options(commonOpts)
   option("hoodie.table.cdc.enabled", "true").
-  option("hoodie.table.cdc.supplemental.logging", "true"). //enable cdc supplemental logging 
-  // option("hoodie.table.cdc.supplemental.logging", "false"). //disable cdc supplemental logging 
+  option("hoodie.table.cdc.supplemental.logging.mode", "DATA_AFTER").
   save("/path/to/hudi")
 ```
 
-Spark SQL example:
+Spark SQL:
 
 ```sql
 -- create a hudi table that enable cdc
@@ -174,20 +191,35 @@ create table hudi_cdc_table
 ) using hudi
 tblproperties (
     'primaryKey' = 'id',
-    'preCombineField' = 'ts',
+    'preCombineField' = 'ts_ms',
     'hoodie.table.cdc.enabled' = 'true',
-    'hoodie.table.cdc.supplemental.logging' = 'true',
+    'hoodie.table.cdc.supplemental.logging.mode' = 'DATA_AFTER',
     'type' = 'cow'
 )
 ```
 
-### Read
+### <a name="cdcread"></a>Read
+
+### How to infer CDC results
+
+| `HoodieCDCInferCase` | Infer case details                                                                                                        | Infer logic                                                                                                                                                               | Note                               |
+|----------------------|---------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------|
+| `AS_IS`              | CDC file written (suffix contains `-cdc`) alongside base files (COW) or log files (MOR)                                   | CDC info will be extracted as is                                                                                                                                          | the read-optimized way to read CDC |
+| `BASE_FILE_INSERT`   | Base files were written to a new file group                                                                               | All records (in the current commit): `op=I`, `before=null`, `after=<current value>`                                                                                       | on-the-fly inference               |
+| `BASE_FILE_DELETE`   | Records are not found from the current commit's base file but found in the previous commit's within the same file group   | All records (in the previous commit): `op=D`, `before=<previous value>`, `after=null`                                                                                     | on-the-fly inference               |
+| `LOG_FILE`           | For MOR, log files to be read and records to be looked up in the previous file slice.                                     | Current record read from delete block, found in previous file slice => `op=D`, `before=<previous value>`, `after=null`                                                    | on-the-fly inference               |
+| `LOG_FILE`           | ditto                                                                                                                     | Current record read from delete block, not found in previous file slice => skip due to the delete log block should be discarded (trying to delete non-existing records)   | on-the-fly inference               |
+| `LOG_FILE`           | ditto                                                                                                                     | Current record not read from delete block, found in previous file slice => `op=U`, `before=<previous value>`, `after=<current value>`                                     | on-the-fly inference               |
+| `LOG_FILE`           | ditto                                                                                                                     | Current record not read from delete block, not found in previous file slice => `op=I`, `before=null`, `after=<current value>`                                             | on-the-fly inference               |
+| `REPLACE_COMMIT`     | File group corresponds to a replace commit                                                                                | All records `op=D`, `before=<value from the file group>`, `after=null`                                                                                                    | on-the-fly inference               |
+
+### Illustrations
 
-This part just discuss how to make Spark (including Spark DataFrame, SQL, Streaming) to read the Hudi CDC data.
+This section uses Spark (incl. Spark DataFrame, SQL, Streaming) as an example to perform CDC-format incremental queries.
 
 Implement `CDCReader` that do these steps to response the CDC request:
 
-- judge whether this is a table that has enabled `hoodie.table.cdc.enabled`, and the query range is valid.
+- check if `hoodie.table.cdc.enabled=true`, and if the query range is valid.
 - extract and filter the commits needed from `ActiveTimeline`.
 - For each of commit, get and load the changing files, union and return `DataFrame`.
   - We use different ways to extract data according to different file types, details see the description about CDC File Type.
@@ -207,7 +239,6 @@ class CDCReader(
     filters: Array[Filter]): RDD[Row] = {
   // ...
   }
-
 }
 ```
 
@@ -215,18 +246,24 @@ Note:
 
 - Only instants that are active can be queried in a CDC scenario.
 - `CDCReader` manages all the things on CDC, and all the spark entrances(DataFrame, SQL, Streaming) call the functions in `CDCReader`.
-- If `hoodie.table.cdc.supplemental.logging` is false, we need to do more work to get the change data. The following illustration explains the difference when this config is true or false.
+- If `hoodie.table.cdc.supplemental.logging.mode=KEY_OP`, we need to compute the changed data. The following illustrates the difference.
 
 ![](read_cdc_log_file.jpg)
 
 #### COW table
 
-Reading COW table in CDC query mode is equivalent to reading a simplified MOR table that has no normal log files.
+CDC queries always extract and return the persisted CDC data.
 
 #### MOR table
 
-According to the design of the writing part, only the cases where writing mor tables will write out the base file (which call the `HoodieMergeHandle` and it's subclasses) will write out the cdc files.
-In other words, cdc files will be written out only for the index and file size reasons.
+According to the section "Persisting CDC in MOR", persisted CDC data is available upon base files' generation. The CDC-type query results 
+should be consisted of on-the-fly read results and as-is read results. See the [Read](#cdcread) section.
+
+The implementation should
+
+- compute the results on-the-fly by reading log files and the corresponding base files (current and previous file slices).
+- extract the results by reading persisted CDC data and the corresponding base files (current and previous file slices).
+- stitch the results from previous 2 steps and return the complete freshest results
 
 Here use an illustration to explain how we can query the CDC on MOR table in kinds of cases.
 
@@ -238,7 +275,8 @@ Spark DataSource
 
 ```scala
 spark.read.format("hudi").
-  option("hoodie.datasource.query.type", "cdc").
+  option("hoodie.datasource.query.type", "incremental").
+  option("hoodie.datasource.query.incremental.format", "cdc").
   option("hoodie.datasource.read.begin.instanttime", "20220426103000000").
   option("hoodie.datasource.read.end.instanttime", "20220426113000000").
   load("/path/to/hudi")
@@ -261,7 +299,7 @@ Spark Streaming
 
 ```scala
 val df = spark.readStream.format("hudi").
-  option("hoodie.datasource.query.type", "cdc").
+  option("hoodie.datasource.query.type", "incremental").
   load("/path/to/hudi")
 
 // launch a streaming which starts from the current snapshot of the hudi table,
@@ -271,11 +309,32 @@ val stream = df.writeStream.format("console").start
 
 # Rollout/Adoption Plan
 
-The CDC feature can be enabled by the corresponding configuration, which is default false. Using this feature dos not depend on Spark versions.
+Spark support phase 1
+
+- For COW: support Spark CDC write/read fully
+- For MOR: support Spark CDC write (only `OP=I` when write inserts to base files) and CDC on-the-fly inferring read.
+
+Spark support phase 2
+
+- For MOR: Spark CDC write (`OP=U/D` when compact updates/deletes to log files) and CDC read to combine on-the-fly
+  inferred data and persisted CDC data.
+  - Note: for CDC write via compaction, `HoodieMergedLogRecordScanner` needs to support producing CDC data for each
+    version of the changed records. `HoodieCompactor` and `HoodieMergeHandler` are to adapt the
+    changes. See [HUDI-4705](https://issues.apache.org/jira/browse/HUDI-4705).
+
+Flink support can be developed in parallel, and can use of the common logical changes of CDC write via compaction in
+Spark support phase 2.
 
 # Test Plan
 
-- [ ] Unit tests for this
-- [ ] Production end-to-end integration test
-- [ ] Benchmark snapshot query for large tables
+- Unit tests for this
+- Production end-to-end integration test
+- Benchmark snapshot query for large tables
+
+# Appendix
+
+## Affected code paths
+
+For `supplemental.logging=DATA_BEFORE` or `DATA_AFTER`
 
+![](code-paths.jpg)
diff --git a/rfc/rfc-51/scenario-definition.jpg b/rfc/rfc-51/scenario-illustration.jpg
similarity index 100%
rename from rfc/rfc-51/scenario-definition.jpg
rename to rfc/rfc-51/scenario-illustration.jpg