You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/05/16 14:27:00 UTC

[jira] [Closed] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

     [ https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jark Wu closed FLINK-9528.
--------------------------
    Fix Version/s: 1.11.0
       Resolution: Fixed

This is fixed by FLINK-16887. 

Test is added in {{org.apache.flink.table.planner.runtime.stream.table.TableSinkITCase#testUpsertSinkWithFilter}}

> Incorrect results: Filter does not treat Upsert messages correctly.
> -------------------------------------------------------------------
>
>                 Key: FLINK-9528
>                 URL: https://issues.apache.org/jira/browse/FLINK-9528
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.3.3, 1.4.2, 1.5.0
>            Reporter: Fabian Hueske
>            Priority: Critical
>             Fix For: 1.11.0
>
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between retraction and upsert mode. A Calc looks at record (regardless of its update semantics) and either discard it (predicate evaluates to false) or pass it on (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.getConfig.enableObjectReuse()
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val t = StreamTestData.get3TupleDataStream(env)
>       .assignAscendingTimestamps(_._1.toLong)
>       .toTable(tEnv, 'id, 'num, 'text)
>     t.select('text.charLength() as 'len)
>       .groupBy('len)
>       .select('len, 'len.count as 'cnt)
>       // .where('cnt < 7)
>       .writeToSink(new TestUpsertSink(Array("len"), false))
>     env.execute()
>     val results = RowCollector.getAndClearValues
>     val retracted = RowCollector.upsertResults(results, Array(0)).sorted
>     val expectedWithoutFilter = List(
>       "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
>     val expectedWithFilter = List(
>     "2,1", "5,1", "11,1", "14,1", "25,1").sorted
>     assertEquals(expectedWithoutFilter, retracted)
>     // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows that do not fulfill the condition are removed from the result. However, the filter only removes the upsert message such that the previous version remains in the result.
> One solution could be to make a filter aware of the update semantics (retract or upsert) and convert the upsert message into a delete message if the predicate evaluates to false.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)