You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Zhaojing Yu (Jira)" <ji...@apache.org> on 2022/10/01 12:11:00 UTC

[jira] [Updated] (HUDI-4733) Flag emitDelete is inconsistent in HoodieTableSource and MergeOnReadInputFormat

     [ https://issues.apache.org/jira/browse/HUDI-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Zhaojing Yu updated HUDI-4733:
------------------------------
    Fix Version/s: 0.13.0
                       (was: 0.12.1)

> Flag emitDelete is inconsistent in HoodieTableSource and MergeOnReadInputFormat
> -------------------------------------------------------------------------------
>
>                 Key: HUDI-4733
>                 URL: https://issues.apache.org/jira/browse/HUDI-4733
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink, flink-sql
>            Reporter: nonggia.liang
>            Assignee: Zhaojing Yu
>            Priority: Minor
>             Fix For: 0.13.0
>
>         Attachments: image 1.png
>
>
> When reading a MOR table in flink, we encountered an exception from flink runtime ( as shown in image1), which complained the table source should not emit a retract record.
> !image 1.png!
> I think here is the cause, in HoodieTableSource:
> {code:java}
> @Override
> public ChangelogMode getChangelogMode() {
>   // when read as streaming and changelog mode is enabled, emit as FULL mode;
>   // when all the changes are compacted or read as batch, emit as INSERT mode.
>   return OptionsResolver.emitChangelog(conf) ? ChangelogModes.FULL : ChangelogMode.insertOnly();
> } {code}
> {code:java}
> private InputFormat<RowData, ?> getStreamInputFormat() { 
> ...
> if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) { 
>   final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); 
>   boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ; 
>   return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, Collections.emptyList(), emitDelete); }
> ...
>  }
> {code}
> With these options:
> {{'table.type'}} {{= }}{{'MERGE_ON_READ'}}
> {{'read.streaming.enabled'}} {{= }}{{'true'}}
> {{The HoodieTableSource}} annouces it has only INSERT changelog, 
> but MergeOnReadInputFormat will emit delete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)