You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Xianjin YE (Jira)" <ji...@apache.org> on 2020/11/16 15:57:00 UTC

[jira] [Updated] (HUDI-1397) Different behavior between RealtimeCompactedRecordReader and HoodieMergeOnReadRDD

     [ https://issues.apache.org/jira/browse/HUDI-1397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xianjin YE updated HUDI-1397:
-----------------------------
    Description: 
Hi, We were writing our internal payload class and found that Hudi's RealtimeCompactedRecordReader's behavior doesn't match HoodieMergeOnReadRDD.

 

To be specifically, when reading a delta record with merging log and base enabled, the expected behavior would be merge base + delta record. It's correctly handled by `HoodieMergeOnReadRDD`'s `mergeRowWithLog` method
{code:java}
private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
  val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
  logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema)
}
{code}
However the similar logic cannot be found in `RealtimeCompactedRecordReader's`, it just assumes the record in delta log is the latest.

 

cc [~garyli1019] since you wrote the `HoodieMergeOnReadRDD` code. It would be wonderful to merge base and delta record in the `RealtimeCompactedRecordReader`

 

Also cc [~wayblink].

  was:
Hi, We were writing our internal payload class and found that Hudi's RealtimeCompactedRecordReader's behavior doesn't match HoodieMergeOnReadRDD.

 

To be specifically, when reading a delta record with merging log and base enabled, the expected behavior would be merge base + delta record. It's correctly handled by `HoodieMergeOnReadRDD`'s `mergeRowWithLog` method
{code:java}
private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
  val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
  logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema)
}
{code}
However the similar logic cannot be found in `RealtimeCompactedRecordReader's`, it just assumes the record in delta log is the latest.

 

cc [~garyli1019] since you wrote the `HoodieMergeOnReadRDD` code. It would be wonderful to merge base and delta record in the `RealtimeCompactedRecordReader`


> Different behavior between RealtimeCompactedRecordReader and HoodieMergeOnReadRDD
> ---------------------------------------------------------------------------------
>
>                 Key: HUDI-1397
>                 URL: https://issues.apache.org/jira/browse/HUDI-1397
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Xianjin YE
>            Priority: Major
>
> Hi, We were writing our internal payload class and found that Hudi's RealtimeCompactedRecordReader's behavior doesn't match HoodieMergeOnReadRDD.
>  
> To be specifically, when reading a delta record with merging log and base enabled, the expected behavior would be merge base + delta record. It's correctly handled by `HoodieMergeOnReadRDD`'s `mergeRowWithLog` method
> {code:java}
> private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
>   val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
>   logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema)
> }
> {code}
> However the similar logic cannot be found in `RealtimeCompactedRecordReader's`, it just assumes the record in delta log is the latest.
>  
> cc [~garyli1019] since you wrote the `HoodieMergeOnReadRDD` code. It would be wonderful to merge base and delta record in the `RealtimeCompactedRecordReader`
>  
> Also cc [~wayblink].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)