You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Hequn Cheng (JIRA)" <ji...@apache.org> on 2018/03/09 15:13:00 UTC

[jira] [Closed] (FLINK-8898) Materialize time indicators in conditions of LogicalFilter

     [ https://issues.apache.org/jira/browse/FLINK-8898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hequn Cheng closed FLINK-8898.
------------------------------
    Resolution: Duplicate

> Materialize time indicators in conditions of LogicalFilter
> ----------------------------------------------------------
>
>                 Key: FLINK-8898
>                 URL: https://issues.apache.org/jira/browse/FLINK-8898
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Hequn Cheng
>            Priority: Major
>
> Currently, {{RelTimeIndicatorConverter}} do not materialize time indicators in conditions of LogicalFilter which leads to type miss exceptions. We can reproduce the exception by the following test case.
> {code:java}
> @Test
> def reproduceTypeMissmatch(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> env.setStateBackend(getStateBackend)
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> StreamITCase.clear
> val data1 = new mutable.MutableList[(Int, Long, Int, Long)]
> data1.+=((1, 1L, 1, 1L))
> data1.+=((1, 2L, 1, 1L))
> val t1 = env.fromCollection(data1)
> .assignTimestampsAndWatermarks(new Row5WatermarkExtractor)
> .toTable(tEnv, 'id, 'ip, 'type, 'eventTs.rowtime)
> tEnv.registerTable("myTable", t1)
> val sql1 = "select distinct id, eventTs as eventTs, count(*) over (partition by id order by eventTs rows" +
> " between 100 preceding and current row) as cnt1 from myTable"
> val sql2 = "select distinct id as r_id, eventTs as r_eventTs, count(*) over (partition by id " +
> "order by eventTs rows between 50 preceding and current row) as cnt2 from myTable"
> val left = tEnv.sqlQuery(sql1)
> val right = tEnv.sqlQuery(sql2)
> left.join(right).where("id = r_id && eventTs === r_eventTs").select('id)
> .writeToSink(new TestRetractSink, queryConfig)
> env.execute()
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)