You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Danny Chen (Jira)" <ji...@apache.org> on 2022/06/20 07:20:00 UTC

[jira] [Resolved] (HUDI-4277) Support computed column for flink HoodieTableSource

     [ https://issues.apache.org/jira/browse/HUDI-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Danny Chen resolved HUDI-4277.
------------------------------

> Support computed column for flink HoodieTableSource 
> ----------------------------------------------------
>
>                 Key: HUDI-4277
>                 URL: https://issues.apache.org/jira/browse/HUDI-4277
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink, flink-sql
>            Reporter: Shizhi Chen
>            Assignee: Shizhi Chen
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.12.0
>
>         Attachments: image-2022-06-17-19-20-33-323.png
>
>
> Current implemention of HoodieTableSource does not take the computed column under consideration
>  
> {code:java}
> public HoodieTableSource(
>     ResolvedSchema schema,
>     Path path,
>     List<String> partitionKeys,
>     String defaultPartName,
>     Configuration conf,
>     @Nullable List<Map<String, String>> requiredPartitions,
>     @Nullable int[] requiredPos,
>     @Nullable Long limit,
>     @Nullable List<Expression> filters) {
>   this.schema = schema;
>   this.path = path;
>   this.partitionKeys = partitionKeys;
>   this.defaultPartName = defaultPartName;
>   this.conf = conf;
>   this.fileIndex = FileIndex.instance(this.path, this.conf);
>   this.requiredPartitions = requiredPartitions;
>   this.requiredPos = requiredPos == null
>       ? IntStream.range(0, schema.getColumnCount()).toArray()
>       : requiredPos;{code}
> When we use flink sql to declare a computed column like:
> {code:java}
> CREATE TABLE hudi_source (
>   xxx,
>   ts_str STRING,
>   ts as TO_TIMESTAMP(ts_str,'yyyy-MM-dd HH:mm:ss'),
>   WATERMARK FOR ts AS ts,
>   PRIMARY KEY(xxx) NOT ENFORCED
> )  WITH (...){code}
> the flink sql planner would not apply the defined projection:
> {code:java}
> @Override
> public void applyProjection(int[][] projections) {
>   // nested projection is not supported.
>   this.requiredPos = Arrays.stream(projections).mapToInt(array -> array[0]).toArray();
> } {code}
> Finally we will get an unexpected runtime exception: 
> !image-2022-06-17-19-20-33-323.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)