You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (Jira)" <ji...@apache.org> on 2020/01/17 09:31:00 UTC
[jira] [Created] (FLINK-15631) Cannot use generic types as the
result of an AggregateFunction in Blink planner
Dawid Wysakowicz created FLINK-15631:
----------------------------------------
Summary: Cannot use generic types as the result of an AggregateFunction in Blink planner
Key: FLINK-15631
URL: https://issues.apache.org/jira/browse/FLINK-15631
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.9.0, 1.10.0
Reporter: Dawid Wysakowicz
It is not possible to use a GenericTypeInfo for a result type of an {{AggregateFunction}} in a retract mode with state cleaning disabled.
{code}
@Test
def testGenericTypes(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(env, setting)
val t = env.fromElements(1, 2, 3).toTable(tEnv, 'a)
val results = t
.select(new GenericAggregateFunction()('a))
.toRetractStream[Row]
val sink = new TestingRetractSink
results.addSink(sink).setParallelism(1)
env.execute()
}
class RandomClass(var i: Int)
class GenericAggregateFunction extends AggregateFunction[java.lang.Integer, RandomClass] {
override def getValue(accumulator: RandomClass): java.lang.Integer = accumulator.i
override def createAccumulator(): RandomClass = new RandomClass(0)
override def getResultType: TypeInformation[java.lang.Integer] = new GenericTypeInfo[Integer](classOf[Integer])
override def getAccumulatorType: TypeInformation[RandomClass] = new GenericTypeInfo[RandomClass](
classOf[RandomClass])
def accumulate(acc: RandomClass, value: Int): Unit = {
acc.i = value
}
def retract(acc: RandomClass, value: Int): Unit = {
acc.i = value
}
def resetAccumulator(acc: RandomClass): Unit = {
acc.i = 0
}
}
{code}
The code above fails with:
{code}
Caused by: java.lang.UnsupportedOperationException: BinaryGeneric cannot be compared
at org.apache.flink.table.dataformat.BinaryGeneric.equals(BinaryGeneric.java:77)
at GroupAggValueEqualiser$17.equalsWithoutHeader(Unknown Source)
at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:177)
at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:170)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
at java.lang.Thread.run(Thread.java:748)
{code}
This is related to FLINK-13702
--
This message was sent by Atlassian Jira
(v8.3.4#803005)