You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (Jira)" <ji...@apache.org> on 2020/01/17 09:33:00 UTC

[jira] [Commented] (FLINK-15631) Cannot use generic types as the result of an AggregateFunction in Blink planner

    [ https://issues.apache.org/jira/browse/FLINK-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017838#comment-17017838 ] 

Dawid Wysakowicz commented on FLINK-15631:
------------------------------------------

[~lzljs3620320] Could you have a look at this? Apparently it was not safe to remove the implementation of {{BinaryGeneric#equals}} as we discussed in FLINK-13702.

> Cannot use generic types as the result of an AggregateFunction in Blink planner
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-15631
>                 URL: https://issues.apache.org/jira/browse/FLINK-15631
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.9.0, 1.10.0
>            Reporter: Dawid Wysakowicz
>            Priority: Major
>
> It is not possible to use a GenericTypeInfo for a result type of an {{AggregateFunction}} in a retract mode with state cleaning disabled.
> {code}
>   @Test
>   def testGenericTypes(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>     val tEnv = StreamTableEnvironment.create(env, setting)
>     val t = env.fromElements(1, 2, 3).toTable(tEnv, 'a)
>     val results = t
>       .select(new GenericAggregateFunction()('a))
>       .toRetractStream[Row]
>     val sink = new TestingRetractSink
>     results.addSink(sink).setParallelism(1)
>     env.execute()
>   }
> class RandomClass(var i: Int)
> class GenericAggregateFunction extends AggregateFunction[java.lang.Integer, RandomClass] {
>   override def getValue(accumulator: RandomClass): java.lang.Integer = accumulator.i
>   override def createAccumulator(): RandomClass = new RandomClass(0)
>   override def getResultType: TypeInformation[java.lang.Integer] = new GenericTypeInfo[Integer](classOf[Integer])
>   override def getAccumulatorType: TypeInformation[RandomClass] = new GenericTypeInfo[RandomClass](
>     classOf[RandomClass])
>   def accumulate(acc: RandomClass, value: Int): Unit = {
>     acc.i = value
>   }
>   def retract(acc: RandomClass, value: Int): Unit = {
>     acc.i = value
>   }
>   def resetAccumulator(acc: RandomClass): Unit = {
>     acc.i = 0
>   }
> }
> {code}
> The code above fails with:
> {code}
> Caused by: java.lang.UnsupportedOperationException: BinaryGeneric cannot be compared
> 	at org.apache.flink.table.dataformat.BinaryGeneric.equals(BinaryGeneric.java:77)
> 	at GroupAggValueEqualiser$17.equalsWithoutHeader(Unknown Source)
> 	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:177)
> 	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
> 	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:170)
> 	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> 	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> 	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}
> This is related to FLINK-13702



--
This message was sent by Atlassian Jira
(v8.3.4#803005)