You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/08/11 08:33:00 UTC

[jira] [Comment Edited] (FLINK-18862) Fix LISTAGG throws BinaryRawValueData cannot be cast to StringData exception in runtime

    [ https://issues.apache.org/jira/browse/FLINK-18862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17175195#comment-17175195 ] 

Jark Wu edited comment on FLINK-18862 at 8/11/20, 8:33 AM:
-----------------------------------------------------------

Fixed in 
 - master (1.12.0): bfbdca9f574b4201ba7a400607c5169134ecf0a2
 - 1.11.2: a72a8cd430a00c51ba55d7d9eed3e39dcbac3164


was (Author: jark):
Fixed in 
 - master (1.12.0): bfbdca9f574b4201ba7a400607c5169134ecf0a2
 - 1.11.2: TODO

> Fix LISTAGG throws BinaryRawValueData cannot be cast to StringData exception in runtime
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-18862
>                 URL: https://issues.apache.org/jira/browse/FLINK-18862
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.11.1
>            Reporter: YUJIANBO
>            Assignee: Jark Wu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.11.2
>
>         Attachments: sql 中agg算子的 count(1) 的结果强转成String类型再经过一次agg的操作就不能成功.txt
>
>
> 1. Env:flinksql、 version 1.11.1,perjob mode
> 2. Error:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData
> 3、Job:
> (1) create a kafka table
> {code:java}
>     CREATE TABLE kafka(
>         x String,
>         y String
>     )with(
>        'connector' = 'kafka',
>         ......
>     )
> {code}
> (2)create a view:
> {code:java}
>    CREATE VIEW view1 AS
>    SELECT 
>        x, 
>        y, 
>        CAST(COUNT(1) AS VARCHAR) AS ct
>    FROM kafka
>    GROUP BY 
>        x, y
> {code}
> (3) aggregate on the view:
> {code:java}
>     select 
>          x, 
>          LISTAGG(CONCAT_WS('=', y, ct), ',') AS lists
>     FROM view1
>     GROUP BY x
> {code}
> And then the exception is thrown:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData
> The problem is that, there is no RawValueData in the query. The result type of count(1) should be bigint, not RawValueData. 
>    
> (4) If there is no aggregation, the job can run succefully.
> {code:java}
>     select 
> 	    x, 
> 	    CONCAT_WS('=', y, ct)
>     from view1
> {code}
> The detailed exception:
> {code:java}
> java.lang.ClassCastException: org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData
> 	at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) ~[flink-table-blink_2.11-1.11.1.jar:?]
> 	at org.apache.flink.table.data.RowData.get(RowData.java:273) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> 	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> 	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> 	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> 	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [ad_features_auto-1.0-SNAPSHOT.jar:?]
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [ad_features_auto-1.0-SNAPSHOT.jar:?]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)