You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Lsw_aka_laplace (Jira)" <ji...@apache.org> on 2020/10/19 10:20:00 UTC

[jira] [Comment Edited] (FLINK-19706) Introduce `Repeated Partition Commit Check` in `org.apache.flink.table.filesystem.PartitionCommitPolicy`

    [ https://issues.apache.org/jira/browse/FLINK-19706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216601#comment-17216601 ] 

Lsw_aka_laplace edited comment on FLINK-19706 at 10/19/20, 10:19 AM:
---------------------------------------------------------------------

[~lzljs3620320]

hi Jingsong

>  We can not store all partitions, and we can not a cache only too.

actually appraoch 1 does not have this problem,  since it queries and checks the partition so there is no need to store all the partition which has been committed, I also prefer 1.(plz see one of the implementations that I just posted on) 

> What about change the way to late event processing?

Coincidently, I had the same concern and did some experiments, unfortunately I failed. The reasons are the followings.
 # the fileStreamingFileWriter related logic is complex( at least for me). Actually it cost a lot of time for me to understand the process of StreamFileWriting, especially writing to the bucket, rolling, committing. Certainly, it is harder to modify or insert users' logic into this process.(If a user knew few about this, mistake will be easily made ) 
 #  How to define the lateness of data? How to judge if this element is late or not?   Actually FileWriter can even know nothing about partition existence(Only committer can do this). So it can not ensure 【the partition which current element is affiliated to】  has been commit or not. What shall we do if the data is late according to watermark but the partition is not committed(which means this data should not be A LATE DATA). What‘s more, it is hard to set a proper lateness period. If it is too small, too many element will be regarded as LATE DATA, both low performance and fatal logic error can occured. If it is too big, this measure may not solve lateness/repeated commit correctly.

 

So, as far as I am concerned,late event processing  shall be the eventual solution to this problem. But it’s implementation can not be easily accomplished without lots of efforts on it. And even through finally it has been  implemented, still it is hard to understand and use(from my perspective).my solution may can not directly solve or fix the problem, but it is efficient to find the problem out and it is esay to implement. Maybe we can just use my solution and 【late event processing】will be a long term and eventual solution

 

 

 

 

 

 

 


was (Author: neighborhood):
>  We can not store all partitions, and we can not a cache only too.

actually appraoch 1 does not have this problem,  since it queries and checks the partition so there is no need to store all the partition which has been committed, I also prefer 1.(plz see one of the implementations that I just posted on) 

> What about change the way to late event processing?

Coincidently, I had the same concern and did some experiments, unfortunately I failed. The reasons are the followings.
 # the fileStreamingFileWriter related logic is complex( at least for me). Actually it cost a lot of time for me to understand the process of StreamFileWriting, especially writing to the bucket, rolling, committing. Certainly, it is harder to modify or insert users' logic into this process.(If a user knew few about this, mistake will be easily made ) 
 #  How to define the lateness of data? How to judge if this element is late or not?   Actually FileWriter can even know nothing about partition existence(Only committer can do this). So it can not ensure 【the partition which current element is affiliated to】  has been commit or not. If the data is late according to watermark but the partition is not committed(which means this data should not be A LATE DATA). What‘s more, it is hard to set a proper lateness period. If it is too small, too many element will be regarded as LATE DATA, both low performance and fatal logic error can occured. If it is too big, this measure may not solve lateness/repeated commit correctly.

 

So, as far as I am concerned,late event processing  shall be the eventual solution to this problem. But it’s implementation can not be easily accomplished without lots of efforts on it. And even through finally it has been  implemented, still it is hard to understand and use(from my perspective).my solution may can not directly solve or fix the problem, but it is efficient to find the problem out and it is esay to implement. Maybe we can just use my solution and 【late event processing】will be a long term and eventual solution

 

 

 

 

 

 

 

> Introduce `Repeated Partition Commit Check` in `org.apache.flink.table.filesystem.PartitionCommitPolicy` 
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19706
>                 URL: https://issues.apache.org/jira/browse/FLINK-19706
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem, Connectors / Hive, Table SQL / Runtime
>            Reporter: Lsw_aka_laplace
>            Priority: Minor
>         Attachments: image-2020-10-19-16-47-39-354.png, image-2020-10-19-16-57-02-661.png, image-2020-10-19-17-00-27-255.png, image-2020-10-19-17-03-21-558.png, image-2020-10-19-18-16-35-083.png
>
>
> Hi all,
>       Recently we have been devoted to using Hive Streaming Writing to accelerate our data-sync of Data Warehouse based on Hive, and eventually we made it. 
>        For producing purpose, a lot of metrics/logs/measures were added in order to help us analyze running info or fix some unexpected problems. Among these mentioned above, we found that Checking Repeated Partition Commit is the most important one. So here, we are willing to make a contribution of introducing this backwards to Community.
>      If this proposal is meaning, I am happy to introduce my design and implementation.
>  
> Looking forward to ANY opinion~
>  
>  
> ----UPDATE ----
>  
> Our user(using our own platform to build his own Flink job)raised some Requests. One of the requests is that once the parition is commited, the data in this partitio is regarded as frozen or completed. [Commiting partition] seem like a gurantee(but we all know it is hard to be a promise) in some way which tells us this partition is completed. Certainly, we make a lot of measures try to achieve that [partition-commit means completed]. So if a partition is committed twice or more times, for us, there must be sth wrong or our measures are insufficent.  On the other hand, it also inform us to do sth to make up to avoid data-loss or data-incompletion.  
>  
> So first of all, it is important to let us or help us know that certain partition is committed repeatedly. So that we can do the following things ASAP
>    1. analyze the reason or the cause 
>    2. do some trade-off operations
>    3. improve our code/measures
>  
> — Design and Implementation--- 
> There are basically two ways, both of them have been used in prod-env
> 1. Add measures in CommitPolicy and be called before partition commit
> !image-2020-10-19-16-47-39-354.png|width=576,height=235!
> //{color:#ffab00}Newly posted, see here{color}
> !image-2020-10-19-18-16-35-083.png|width=725,height=313!
>  1.1 As the pic shows, add `checkPartitionExists` and implement it in sub-class
>   !image-2020-10-19-17-03-21-558.png|width=1203,height=88!
>  1.2 call checkPartitionExists before partition commit
>  
> 2. Build a bounded cache of committed partitions and check it everytime before partition commit 
> (actually this cache supposed to be a operator state)
> !image-2020-10-19-16-57-02-661.png|width=1298,height=57!
>   2.1 build a cache
> !image-2020-10-19-17-00-27-255.png|width=1235,height=116!
>   2.2 check before commit 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)