You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/01/21 03:23:00 UTC
[jira] [Resolved] (FLINK-15631) Cannot use generic types as the
result of an AggregateFunction in Blink planner
[ https://issues.apache.org/jira/browse/FLINK-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu resolved FLINK-15631.
-----------------------------
Resolution: Fixed
1.11.0: a22b9d8ff92764a4437347bb6d2deb10e9a4e054
1.10.0: ce72e50451cfd5ab7e134b5e3d1ac29228d8a763
> Cannot use generic types as the result of an AggregateFunction in Blink planner
> -------------------------------------------------------------------------------
>
> Key: FLINK-15631
> URL: https://issues.apache.org/jira/browse/FLINK-15631
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.9.0, 1.10.0
> Reporter: Dawid Wysakowicz
> Assignee: Jingsong Lee
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.10.0
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> It is not possible to use a GenericTypeInfo for a result type of an {{AggregateFunction}} in a retract mode with state cleaning disabled.
> {code}
> @Test
> def testGenericTypes(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val tEnv = StreamTableEnvironment.create(env, setting)
> val t = env.fromElements(1, 2, 3).toTable(tEnv, 'a)
> val results = t
> .select(new GenericAggregateFunction()('a))
> .toRetractStream[Row]
> val sink = new TestingRetractSink
> results.addSink(sink).setParallelism(1)
> env.execute()
> }
> class RandomClass(var i: Int)
> class GenericAggregateFunction extends AggregateFunction[java.lang.Integer, RandomClass] {
> override def getValue(accumulator: RandomClass): java.lang.Integer = accumulator.i
> override def createAccumulator(): RandomClass = new RandomClass(0)
> override def getResultType: TypeInformation[java.lang.Integer] = new GenericTypeInfo[Integer](classOf[Integer])
> override def getAccumulatorType: TypeInformation[RandomClass] = new GenericTypeInfo[RandomClass](
> classOf[RandomClass])
> def accumulate(acc: RandomClass, value: Int): Unit = {
> acc.i = value
> }
> def retract(acc: RandomClass, value: Int): Unit = {
> acc.i = value
> }
> def resetAccumulator(acc: RandomClass): Unit = {
> acc.i = 0
> }
> }
> {code}
> The code above fails with:
> {code}
> Caused by: java.lang.UnsupportedOperationException: BinaryGeneric cannot be compared
> at org.apache.flink.table.dataformat.BinaryGeneric.equals(BinaryGeneric.java:77)
> at GroupAggValueEqualiser$17.equalsWithoutHeader(Unknown Source)
> at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:177)
> at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
> at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:170)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> This is related to FLINK-13702
--
This message was sent by Atlassian Jira
(v8.3.4#803005)