You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Andy (Jira)" <ji...@apache.org> on 2021/01/10 08:30:00 UTC
[jira] [Created] (FLINK-20910) Remove restriction on
StreamPhysicalGroupWindowAggregate that StreamPhysicalGroupWindowAggregate
only support insert-only input node
Andy created FLINK-20910:
----------------------------
Summary: Remove restriction on StreamPhysicalGroupWindowAggregate that StreamPhysicalGroupWindowAggregate only support insert-only input node
Key: FLINK-20910
URL: https://issues.apache.org/jira/browse/FLINK-20910
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Andy
Now, the optimizer will throw an exception if window aggregate has an input node which does not only generate insert records.
E.g 1 : a deduplicate on row-time is followed by window aggregate:
{code:java}
@Test
def testWindowAggWithDeduplicateAsInput(): Unit = {
val sql =
"""
|SELECT
|b,
|TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start,
|TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) as w_end,
|COUNT(1) AS cnt
|FROM
| (
| SELECT b, rowtime
| FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY b ORDER BY `rowtime` DESC) as rowNum
| FROM MyTable
| )
| WHERE rowNum = 1
|)
|GROUP BY b, TUMBLE(rowtime, INTERVAL '0.005' SECOND)
|""".stripMargin
util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
}
{code}
E.g 2: a window aggregate which allows early fire/late fire is followed by window aggregate:
{code:java}
@Test
def testWindowAggWithLateFireWindowAggAsInput(): Unit = {
util.conf.getConfiguration.setBoolean(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, true)
util.conf.getConfiguration.set(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, Duration.ofSeconds(5))
util.conf.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
val sql =
"""
|SELECT SUM(cnt)
|FROM (
| SELECT COUNT(1) AS cnt, TUMBLE_ROWTIME(`rowtime`, INTERVAL '10' SECOND) AS ts
| FROM MyTable
| GROUP BY a, b, TUMBLE(`rowtime`, INTERVAL '10' SECOND)
|)
|GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
|""".stripMargin
util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
}
{code}
The following exception will be thrown out for above cases:
{code:java}
org.apache.flink.table.api.TableException: StreamPhysicalGroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME])org.apache.flink.table.api.TableException: StreamPhysicalGroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME])
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:384) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:165) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:343) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:332) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:331) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:331) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:281) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:343) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:332) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:331)
...{code}
`FlinkChangelogModeInferenceProgram` limits that
WindowAggregate could support insert-only in input. However, `WindowOperator` could handle insert, update-before, update-after, delete message. We could remove this restrict on the planner.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)