You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "dennysong (Jira)" <ji...@apache.org> on 2023/06/19 07:27:00 UTC

[jira] [Updated] (HUDI-6409) Spark Incremental read of MOR surpports latest_state_with_before format

     [ https://issues.apache.org/jira/browse/HUDI-6409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

dennysong updated HUDI-6409:
----------------------------
    Description: 
The current incremental query for the time range *(t1, t2]* returns the latest record values at *t2.* However, there are situations where we need the record values at *t1* to obtain insert/update information. This is particularly useful in cases of data rollback, as we need to retrieve this delta information(insert/update) from Hudi to roll back external storage (such as Hbase in downstream systems).

Hudi RFC-51 introduces CDC (Change Data Capture) support, but {*}it returns all commit change records between (t1, t2]{*}. This can be somewhat inefficient and wasteful of resources when we only want to know the values at t1 and t2. Furthermore, the returned records in CDC mode are in {*}JSON format with a unified schema(a customized format for CDC scenarios){*}, which differs from the user's original schema. This makes using CDC more difficult compared to snapshot/incremental reads.

An alternative option to obtain both record values at t1 and t2 is to rely on the existing incremental query implementation.

We can include the record value at t1 (before value) when returning the latest record value at t2 in incremental read. For example:

When `hoodie.datasource.query.incremental.format=latest_state`, the returned record is:
{code:java}
column1: string, column2: array<string>{code}
When `hoodie.datasource.query.incremental.format=latest_state_with_before`, the returned record is:
{code:java}
column1: string, column1_before_: string, column2: array<string>, column2_before_: array<string>,{code}
The implementation is simple. Sort both the base file and log file in advance(e.g., compact in MOR), and then perform an ordered merge to combine them.

  was:
The current incremental query for the time range *(t1, t2]* returns the latest record values at *t2.* However, there are situations where we need the record values at *t1* to obtain insert/update information. This is particularly useful in cases of data rollback, as we need to retrieve this delta information(insert/update) from Hudi to roll back external storage (such as Hbase in downstream systems).

 

Hudi RFC-51 introduces CDC (Change Data Capture) support, but {*}it returns all commit change records between (t1, t2]{*}. This can be somewhat inefficient and wasteful of resources when we only want to know the values at t1 and t2. Furthermore, the returned records in CDC mode are in {*}JSON format with a unified schema(a customized format for CDC scenarios){*}, which differs from the user's original schema. This makes using CDC more difficult compared to snapshot/incremental reads.

 

An alternative option to obtain both record values at t1 and t2 is to rely on the existing incremental query implementation.

We can include the record value at t1 (before value) when returning the latest record value at t2 in incremental read. For example:

When `hoodie.datasource.query.incremental.format=latest_state`, the returned record is:

 
{code:java}
column1: string, column2: array<string>{code}
When `hoodie.datasource.query.incremental.format=latest_state_with_before`, the returned record is:

 

 
{code:java}
column1: string, column1_before_: string, column2: array<string>, column2_before_: array<string>,{code}
 

The implementation is simple. Sort both the base file and log file in advance(e.g., compact in MOR), and then perform an ordered merge to combine them.

 

 


> Spark Incremental read of MOR surpports latest_state_with_before format
> -----------------------------------------------------------------------
>
>                 Key: HUDI-6409
>                 URL: https://issues.apache.org/jira/browse/HUDI-6409
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: spark
>            Reporter: dennysong
>            Priority: Major
>         Attachments: incremental_format.svg
>
>
> The current incremental query for the time range *(t1, t2]* returns the latest record values at *t2.* However, there are situations where we need the record values at *t1* to obtain insert/update information. This is particularly useful in cases of data rollback, as we need to retrieve this delta information(insert/update) from Hudi to roll back external storage (such as Hbase in downstream systems).
> Hudi RFC-51 introduces CDC (Change Data Capture) support, but {*}it returns all commit change records between (t1, t2]{*}. This can be somewhat inefficient and wasteful of resources when we only want to know the values at t1 and t2. Furthermore, the returned records in CDC mode are in {*}JSON format with a unified schema(a customized format for CDC scenarios){*}, which differs from the user's original schema. This makes using CDC more difficult compared to snapshot/incremental reads.
> An alternative option to obtain both record values at t1 and t2 is to rely on the existing incremental query implementation.
> We can include the record value at t1 (before value) when returning the latest record value at t2 in incremental read. For example:
> When `hoodie.datasource.query.incremental.format=latest_state`, the returned record is:
> {code:java}
> column1: string, column2: array<string>{code}
> When `hoodie.datasource.query.incremental.format=latest_state_with_before`, the returned record is:
> {code:java}
> column1: string, column1_before_: string, column2: array<string>, column2_before_: array<string>,{code}
> The implementation is simple. Sort both the base file and log file in advance(e.g., compact in MOR), and then perform an ordered merge to combine them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)