You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/11/18 06:10:26 UTC

[GitHub] [hudi] dik111 opened a new issue #4030: [SUPPORT] Flink uses updated fields to update data

dik111 opened a new issue #4030:
URL: https://github.com/apache/hudi/issues/4030


   At some point, we can only get the updated fields. For example, Table A has three fields (a, b, c), but only two fields (a, b) are in the updated-changelog. Can we support updating only some fields operate in flink ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan closed issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
nsivabalan closed issue #4030:
URL: https://github.com/apache/hudi/issues/4030


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-1018599218


   By incremental field updates, do you mean partial field updates. I believe we have this functionality in sql-dml. But not sure on the flink. 
   I will let danny speak for that. 
   CC @xushiyan @YannByron 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] stayrascal edited a comment on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
stayrascal edited a comment on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-1025077842


   Hi @danny0405 , I tired the solution of changing the ValueState of BucketAssignFunction by store the whole HoodieRecord instead of HoodieRecordGlobalLocation. (Once the partition changed, output a delete record to old file, and update the location of old record with new partition path and output a new record to new file)
   
   It works in some cases, but not works in all cases.
   
   **Works cases:**
   - The old record exists in based file(enable bootstrap index), the incoming record partition changed. 
     - The new record from old record and incoming record will be merged before write to new partition file.
   - Only one incoming record before another incoming record with partition changed, these two records will be merged to new  partition file.
     - Assume incoming records a(a=1,b=null,c=2022-01-01) and b(a=null, b=2,p=2022-01-02) coming in a same commit, the a will be stored in ValueState, and it will be sink to 2022-01-01 partition file, and later the b coming, a delete record(c=2022-01-01) will be outputted(which will merge with previous record, no records write to 2021-01-01 file), and the same time, a new record a1(a=1, b=null, c=2022-01-02) will outputted as well, the a1 and b will be merged and then write to 2022-01-02 partition file
   
   **Not Works cases:**
   - there are more than one incoming record before another incoming record with partition changed, only the last record(before partition changed) will be merged.
     - Assume incoming records a(a=1, b=null, c=null, d=2022-01-01), b(a=null, b=2, c=null, d=2022-01-01) and c(a=null, b=null, c=3, d=2022-01-02) coming in a same commit
       - the a and b will be sink to downstream(StreamWriteFunction) with location 2022-01-01 at first.
       - a delete record from b will be sink to downstream with location 2022-01-01 later when record c coming
       - a new record from b1(a=null, b=2, c=null, d=2022-01-02) will sink to downstream with location 2022-01-02
       - record b1 and c will be merged and write to 2022-01-02 partition, but the info from record a will missed, because the ValueState will only store one element.
   
   So in order to support the partial update or overwrite non default(exists) capabilities in current architecture, we might need to change ValueState to ListState(or ValueState with customized object) to store recent n records or merge the record before store in state, once the partition changed, rewrite these latest n records to new partition and clear the state. The above all logic should be control by a feature toggle via a configuration. (notes: the n means only merged the latest n records in a same commit if the partition path changed)
   
   What's your thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-974749801


   @dik111 , your use case is reasonable and valid, i have created a JIRA issue to track this request: https://issues.apache.org/jira/browse/HUDI-2815


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] stayrascal commented on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
stayrascal commented on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-1022865829


   > 
   
   Thanks for replay.
   
   I'm thinking how about change the ValueState of  `BucketAssignFunction` by store the whole `HoodieRecord` instead of `HoodieRecordGlobalLocation`, and then once the partition path changed, beside output a delete record on old file(partition), and also we can update the partition path of old record, and output the updated record new file(partition).
   
   And later, the incoming record and updated record(from old record) will be `#preCombine` in `StreamWriteFunction`.
   
   Not sure is there any other impact that we use ValueState to store whole HoodieRecord instead of `HoodieRecordGlobalLocation` except the state size will bigger?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-974750173


   I just see that payload `OverwriteNonDefaultsWithLatestAvroPayload` solves partial of the problems, it has some space to improve: the `#combineAndGetUpdateValue` should compare record version based on event time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] stayrascal commented on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
stayrascal commented on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-1022340911


   Hi, any new update on this one. 
   
   After review the PR and source code and try to contribute, I find it seems that the partial update cannot support the case that the partition path is changed. Because once a new record coming and if the partition path changed, `BucketAssignFunction` will output two records: one deleted record on old partition, and one new record, and these two records will be assigned to two StreamWriteFunction(HoodieWriteHandle), each write handler only can handle one fileId and incoming/delete record, which means we can merge/partial-update the incoming record with the record exists in old base file. Otherwise, we might need to implement a write handle which can across different file ID.
   
   Please correct me if my understanding is wrong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] stayrascal commented on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
stayrascal commented on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-1025077842


   Hi @danny0405 , I tired the solution of changing the ValueState of BucketAssignFunction by store the whole HoodieRecord instead of HoodieRecordGlobalLocation. (Once the partition changed, output a delete record to old file, and update the location of old record with new partition path and output a new record to new file)
   
   It works in some cases, but not works in all cases.
   
   **Works cases:**
   - The old record exists in based file(enable bootstrap index), the incoming record partition changed. 
     - The new record from old record and incoming record will be merged before write to new partition file.
   - Only one incoming record before another incoming record with partition changed, these two records will be merged to new  partition file.
     - Assume incoming records a(a=1,b=null,c=2022-01-01) and b(a=null, b=2,p=2022-01-02) coming in a same commit, the a will be stored in ValueState, and it will be sink to 2022-01-01 partition file, and later the b coming, a delete record(c=2022-01-01) will be outputted(which will merge with previous record, no records write to 2021-01-01 file), and the same time, a new record a1(a=1, b=null, c=2022-01-02) will outputted as well, the a1 and b will be merged and then write to 2022-01-02 partition file
   
   **Not Works cases:**
   - there are more than one incoming record before another incoming record with partition changed, only the last record(before partition changed) will be merged.
     - Assume incoming records a(a=1, b=null, c=null, d=2022-01-01), b(a=null, b=2, c=null, d=2022-01-01) and c(a=null, b=null, c=3, d=2022-01-02) coming in a same commit
       - the a and b will be sink to downstream(StreamWriteFunction) with location 2022-01-01 at first.
       - a delete record from b will be sink to downstream with location 2022-01-01 later when record c coming
       - a new record from b1(a=null, b=2, c=null, d=2022-01-02) will sink to downstream with location 2022-01-02
       - record b1 and c will be merged and write to 2022-01-02 partition, but the info from record a will missed, because the ValueState will only store one element.
   
   So in order to support the partial update or overwrite non default(exists) capabilities in current architecture, we might need to change ValueState to ListState(or ValueState with customized object) to store recent n records, once the partition changed, rewrite these latest n records to new partition and clear the state. The above all logic should be control by a feature toggle via a configuration. (notes: the n means only merged the latest n records in a same commit if the partition path changed)
   
   What's your thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] dik111 commented on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
dik111 commented on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-983379536


   > Hi, @dik111 I want to sure with you, is your use case occur a schema evolution in updated changelog or schema in fixed, only lack of some fields in updated changelog? Serializing the `GenericRecord` to bytes in hudi will thrown NPE when some fields in schema is not filled into avro `GenericRecord` as your describe case.
   
   HI ,@lsyldliu Yes, our company uses oracle's ogg for log capture, but some tables are too large, and all fields will cause a great delay, so we changed to incremental fields. Can hudi support incremental field updates? This will help us a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-974749801






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lsyldliu edited a comment on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
lsyldliu edited a comment on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-980515305


   Hi, @dik111 I want to sure with you, is your use case occur a schema evolution in updated changelog or schema in fixed, only lack of some fields in updated changelog? Serializing the `GenericRecord` to bytes in hudi will thrown NPE  when  some fields in schema is not filled into avro `GenericRecord` as your describe case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] stayrascal edited a comment on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
stayrascal edited a comment on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-1022865829


   > 
   
   Thanks for replay.
   
   I'm thinking how about change the ValueState of  `BucketAssignFunction` by store the whole `HoodieRecord` instead of `HoodieRecordGlobalLocation`, and then once the partition path changed, beside output a delete record on old file(partition), and also we can update the partition path of old record, and output the updated record on new file(partition).
   
   And later, the incoming record and updated record(from old record) will be `#preCombine` in `StreamWriteFunction`.
   
   Not sure is there any other impact that we use ValueState to store whole HoodieRecord instead of `HoodieRecordGlobalLocation` except the state size will bigger?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-1018600325


   oh, just now noticed we have a PR put up already for the flink. I will go ahead and close the issue out for now. Feel free to continue discussions on the jira and PR. Or open up a new issue if you have any asks. thanks! @dik111 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lsyldliu commented on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-980515305


   Hi, @dik111 I want to sure with you, is your use case occur a schema evolution in updated changelog or schema in fixed, only lack of some fields in updated changelog?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on issue #4030: [SUPPORT] Flink uses updated fields to update data

Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #4030:
URL: https://github.com/apache/hudi/issues/4030#issuecomment-1022785250


   > Hi, any new update on this one.
   > 
   > After review the PR and source code and try to contribute, I find it seems that the partial update cannot support the case that the partition path is changed. Because once a new record coming and if the partition path changed, `BucketAssignFunction` will output two records: one deleted record on old partition, and one new record, and these two records will be assigned to two StreamWriteFunction(HoodieWriteHandle), each write handler only can handle one fileId and incoming/delete record, which means we can merge/partial-update the incoming record with the record exists in old base file. Otherwise, we might need to implement a write handle which can across different file ID.
   > 
   > Please correct me if my understanding is wrong.
   
   Yes, your understanding is correct. Currently it is a little hard in the project to have efficient point look-up, we can not query the old partition record first before we write new, this is the reason why the current code just send a DELETE record with primary key.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org