You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2017/02/07 16:19:41 UTC

[jira] [Commented] (FLINK-5735) Non-overlapping sliding window is not deterministic

    [ https://issues.apache.org/jira/browse/FLINK-5735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15856268#comment-15856268 ] 

Aljoscha Krettek commented on FLINK-5735:
-----------------------------------------

A good first step would be to take the Table API out of the picture and create a minimal example that uses just the {{DataStream}} API.

> Non-overlapping sliding window is not deterministic
> ---------------------------------------------------
>
>                 Key: FLINK-5735
>                 URL: https://issues.apache.org/jira/browse/FLINK-5735
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Timo Walther
>
> I don't know if this is a problem of the Table API or the underlying API. We have to investigate this as part of the issue.
> The following code leads to different results from time to time. Sometimes the count of "Hello" is 1 sometimes 2.
> {code}
>   val data = List(
>     (1L, 1, "Hi"),
>     (2L, 2, "Hallo"),
>     (3L, 2, "Hello"),
>     (6L, 3, "Hello"),
>     (4L, 5, "Hello"),
>     (16L, 4, "Hello world"),
>     (8L, 3, "Hello world"))
>   @Test
>   def testEventTimeSlidingWindowNonOverlapping(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     StreamITCase.testResults = mutable.MutableList()
>     val stream = env
>       .fromCollection(data)
>       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
>     val table = stream.toTable(tEnv, 'long, 'int, 'string)
>     val windowedTable = table
>       .window(Slide over 5.milli every 10.milli on 'rowtime as 'w)
>       .groupBy('w, 'string)
>       .select('string, 'int.count, 'w.start, 'w.end)
>     val results = windowedTable.toDataStream[Row]
>     results.addSink(new StreamITCase.StringSink)
>     env.execute()
>     val expected = Seq(
>       "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
>       "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
>       "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
>     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
>   }
>   class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
>     override def checkAndGetNextWatermark(
>         lastElement: (Long, Int, String),
>         extractedTimestamp: Long)
>       : Watermark = {
>       new Watermark(extractedTimestamp)
>     }
>     override def extractTimestamp(
>         element: (Long, Int, String),
>         previousElementTimestamp: Long): Long = {
>       element._1
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)