You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "sunjincheng (JIRA)" <ji...@apache.org> on 2017/05/20 18:39:04 UTC
[jira] [Created] (FLINK-6650) Fix Non-windowed group-aggregate
error when using append-table mode.
sunjincheng created FLINK-6650:
----------------------------------
Summary: Fix Non-windowed group-aggregate error when using append-table mode.
Key: FLINK-6650
URL: https://issues.apache.org/jira/browse/FLINK-6650
Project: Flink
Issue Type: Sub-task
Reporter: sunjincheng
Assignee: sunjincheng
When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new StreamITCase.StringSink)}}, I got the error as follows:
{code}
org.apache.flink.table.api.TableException: Table is not an append-only table. Output needs to handle update and delete changes.
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
{code}
The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
{code}
override def producesUpdates = true
{code}
I think in the view of the user, what user want are(for example):
Data:
{code}
val data = List(
(1L, 1, "Hello"),
(2L, 2, "Hello"),
(3L, 3, "Hello"),
(4L, 4, "Hello"),
(5L, 5, "Hello"),
(6L, 6, "Hello"),
(7L, 7, "Hello World"),
(8L, 8, "Hello World"),
(20L, 20, "Hello World"))
{code}
*Case1:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result
{code}
1
3
6
10
15
21
28
36
56
{code}
* Case 2:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result:
{code}
56
{code}
In fact about #Case 1,we can using unbounded OVER windows, as follows:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
.window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
.select('a.sum over 'w)
.toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
Same as #Case1
{code}
But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649]
OVER can not express the #Case1 with earlyFiring.
So I still think Non-windowed group-aggregate not always update-table, user can decide which mode to use.
Is there any drawback to this improvement? Welcome anyone feedback?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)