You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "kazdy (Jira)" <ji...@apache.org> on 2023/03/27 20:40:00 UTC

[jira] [Comment Edited] (HUDI-5707) Support offset reset strategy w/ spark streaming read from hudi table

    [ https://issues.apache.org/jira/browse/HUDI-5707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17705663#comment-17705663 ] 

kazdy edited comment on HUDI-5707 at 3/27/23 8:39 PM:
------------------------------------------------------

I think we can follow what kafka structured streaming source does.

1. hoodie.datasource.read.streaming.failOnDataLoss true || false (false) - when false, if start commit offset is not available in active timeline start reading from the earliest available offset (detecting earliest available offset is already there), probably good idea do put something to WARN log. When true, throw exception informing about potential data loss, that should stop streaming query execution.
2. hoodie.datasource.read.streaming.fallback.fulltablescan.enable: true || false (default) - when true and failOnDataLoss is false (default), fallback to fulltable scan when "start offset" commit is cleaned or archived. We don't want to read from the earliest commit in the table in this case, but rather treat cleaned/archived commits as if these were available in active timeline.
3. hoodie.datasource.read.streaming.maxInstantsPerTrigger: rate limit - how many instants/commits fetch per microbatch (this will also enable new AvailableNow trigger to work properly, atm it pulls all the changes same as "Trigger Once").

Might be good idea to introduce hoodie.datasource.read.streaming.maxBytesPerTrigger (best effort as we can't split one commit instant per multiple triggers) in the future.

I think I don't want to introduce auto.offset.reset for Hudi, even though it keeps some checkpoints internally for structured streaming (at least for batch id). I would like to stay as close to what Spark offers as possible. 
If user wants to reset checkpoints, then the procedure is:
 - remove spark checkpoint or change checkpoint location,
 - change hoodie.datasource.write.streaming.checkpoint.identifier to other value to reset internal hudi checkpoint,
 - set hoodie.datasource.streaming.startOffset to earliest/latest or specific instant whichever user needs.

(I would gladly change hoodie.datasource.streaming.startOffset to hoodie.datasource.read.streaming.startOffset to keep read config under read.streaming. prefix)

Regarding CDC stream read, hoodie.datasource.read.streaming.fallback.fulltablescan.enable will not work for CDC query, so it would be good to throw an exception so that when users want to do CDC read and fallback to fulltable scan hudi prevents them do do so. Or just disble this and log some warning.


was (Author: JIRAUSER284048):
I think we can follow what kafka structured streaming source does.

1. hoodie.datasource.read.streaming.failOnDataLoss true || false (false) - when false, if start commit offset is not available in active timeline start reading from the earliest available offset (detecting earliest available offset is already there), probably good idea do put something to WARN log. When true, throw exception informing about potential data loss, that should stop streaming query execution.
2. hoodie.datasource.read.streaming.fallback.fulltablescan.enable: true || false (default) - when true and failOnDataLoss is false (default), fallback to fulltable scan when "start offset" commit is cleaned or archived. We don't want to read from the earliest commit in the table in this case, but rather treat cleaned/archived commits as if these were available in active timeline.
3. hoodie.datasource.read.streaming.maxInstantsPerTrigger: rate limit - how many instants/commits fetch per microbatch (this will also enable new AvailableNow trigger to work properly, atm it pulls all the changes same as "Trigger Once").

Might be good idea to introduce hoodie.datasource.read.streaming.maxBytesPerTrigger (best effort as we can't split one commit instant per multiple triggers) in the future.

I think I don't want to introduce auto.offset.reset for Hudi, even though it keeps some checkpoints internally for structured streaming (at least for batch id). I would like to stay as close to what Spark offers as possible. 
If user wants to reset checkpoints, then the procedure is:
 - remove spark checkpoint or change checkpoint location,
 - change hoodie.datasource.write.streaming.checkpoint.identifier to other value to reset internal hudi checkpoint,
 - set hoodie.datasource.streaming.startOffset to earliest/latest or specific instant whichever user needs.

(I would gladly change hoodie.datasource.streaming.startOffset to hoodie.datasource.read.streaming.startOffset to keep read config under read.streaming. prefix)

Regarding CDC read, hoodie.datasource.read.streaming.fallback.fulltablescan.enable will not work for CDC query, so it would be good to throw an exception so that when users want to do CDC read and fallback to fulltable scan hudi prevents them do do so. Or just disble this and log some warning.

> Support offset reset strategy w/ spark streaming read from hudi table
> ---------------------------------------------------------------------
>
>                 Key: HUDI-5707
>                 URL: https://issues.apache.org/jira/browse/HUDI-5707
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: reader-core
>            Reporter: sivabalan narayanan
>            Assignee: kazdy
>            Priority: Major
>             Fix For: 1.0.0
>
>
> For users reading hudi table in a streaming manner, we need to support offset reset strategy if the commit of interest it archived or cleaned up. 
>  
> notes from the issue 
> In streaming read, user might want to get all incremental changes. from what I see, this is nothing but an incremental query on a hudi table. w/ incremental query, we do have fallback mechanism via {{{}hoodie.datasource.read.incr.fallback.fulltablescan.enable{}}}.
> But in streaming read, the amount of data read might spike up(if we plan to do the same) and the user may not have provisioned higher resources for the job.
> I am thinking, if we should add something like {{auto.offset.reset}} we have in kafka. If you know if we have something similar in streaming read from spark itself, we can leverage the same or add a new config in hoodie.
> So, users can configure what they want to do in such cases:
>  # whether they wish to resume reading from earliest valid commit from hudi.
> // impl might be involved. since we need to dedect the commit which hasn't been cleaned by the cleaner yet.
>  # Or do snapshot query w/ latest table state.
>  # Fail the streaming read.
>  #  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)