You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jingsong Lee (Jira)" <ji...@apache.org> on 2020/01/19 04:17:00 UTC
[jira] [Comment Edited] (FLINK-15631) Cannot use generic types as
the result of an AggregateFunction in Blink planner
[ https://issues.apache.org/jira/browse/FLINK-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018789#comment-17018789 ]
Jingsong Lee edited comment on FLINK-15631 at 1/19/20 4:16 AM:
---------------------------------------------------------------
[~dwysakowicz] Good catch! I think this is bug. We need let it safe to remove the implementation of {{BinaryGeneric#equals}}.
I reproduced bug by closing {{IdleStateRetentionTime}}, CC: [~jark] , looks like we need test it because our it case must open {{IdleStateRetentionTime}}. I think there are three problem:
# Equals code generation not implement generic type.
# Aggregation {{EqualiserCodeGenerator}} need use code generation util {{ScalarOperatorGens}} to generate codes instead of another implementation version.
# Aggregation {{EqualiserCodeGenerator}} handles timestamp type in a wrong way, timestamp internal format is not primitive anymore.
Can you assign this to me?
was (Author: lzljs3620320):
[~dwysakowicz] Good catch! I think this is bug. We need let it safe to remove the implementation of {{BinaryGeneric#equals}}.
I reproduced bug by closing {{IdleStateRetentionTime}}, CC: [~jark] , looks like we need test it because our it case must open {{IdleStateRetentionTime}}. I think there are two problem:
# Equals code generation not implement generic type.
# Aggregation {{EqualiserCodeGenerator}} need use code generation util {{ScalarOperatorGens}} to generate codes instead of another implementation version.
Can you assign this to me?
> Cannot use generic types as the result of an AggregateFunction in Blink planner
> -------------------------------------------------------------------------------
>
> Key: FLINK-15631
> URL: https://issues.apache.org/jira/browse/FLINK-15631
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.9.0, 1.10.0
> Reporter: Dawid Wysakowicz
> Priority: Major
> Fix For: 1.10.0
>
>
> It is not possible to use a GenericTypeInfo for a result type of an {{AggregateFunction}} in a retract mode with state cleaning disabled.
> {code}
> @Test
> def testGenericTypes(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val tEnv = StreamTableEnvironment.create(env, setting)
> val t = env.fromElements(1, 2, 3).toTable(tEnv, 'a)
> val results = t
> .select(new GenericAggregateFunction()('a))
> .toRetractStream[Row]
> val sink = new TestingRetractSink
> results.addSink(sink).setParallelism(1)
> env.execute()
> }
> class RandomClass(var i: Int)
> class GenericAggregateFunction extends AggregateFunction[java.lang.Integer, RandomClass] {
> override def getValue(accumulator: RandomClass): java.lang.Integer = accumulator.i
> override def createAccumulator(): RandomClass = new RandomClass(0)
> override def getResultType: TypeInformation[java.lang.Integer] = new GenericTypeInfo[Integer](classOf[Integer])
> override def getAccumulatorType: TypeInformation[RandomClass] = new GenericTypeInfo[RandomClass](
> classOf[RandomClass])
> def accumulate(acc: RandomClass, value: Int): Unit = {
> acc.i = value
> }
> def retract(acc: RandomClass, value: Int): Unit = {
> acc.i = value
> }
> def resetAccumulator(acc: RandomClass): Unit = {
> acc.i = 0
> }
> }
> {code}
> The code above fails with:
> {code}
> Caused by: java.lang.UnsupportedOperationException: BinaryGeneric cannot be compared
> at org.apache.flink.table.dataformat.BinaryGeneric.equals(BinaryGeneric.java:77)
> at GroupAggValueEqualiser$17.equalsWithoutHeader(Unknown Source)
> at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:177)
> at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
> at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:170)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> This is related to FLINK-13702
--
This message was sent by Atlassian Jira
(v8.3.4#803005)