You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2017/02/07 16:19:41 UTC
[jira] [Commented] (FLINK-5735) Non-overlapping sliding window is
not deterministic
[ https://issues.apache.org/jira/browse/FLINK-5735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15856268#comment-15856268 ]
Aljoscha Krettek commented on FLINK-5735:
-----------------------------------------
A good first step would be to take the Table API out of the picture and create a minimal example that uses just the {{DataStream}} API.
> Non-overlapping sliding window is not deterministic
> ---------------------------------------------------
>
> Key: FLINK-5735
> URL: https://issues.apache.org/jira/browse/FLINK-5735
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Timo Walther
>
> I don't know if this is a problem of the Table API or the underlying API. We have to investigate this as part of the issue.
> The following code leads to different results from time to time. Sometimes the count of "Hello" is 1 sometimes 2.
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hallo"),
> (3L, 2, "Hello"),
> (6L, 3, "Hello"),
> (4L, 5, "Hello"),
> (16L, 4, "Hello world"),
> (8L, 3, "Hello world"))
> @Test
> def testEventTimeSlidingWindowNonOverlapping(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> StreamITCase.testResults = mutable.MutableList()
> val stream = env
> .fromCollection(data)
> .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
> val table = stream.toTable(tEnv, 'long, 'int, 'string)
> val windowedTable = table
> .window(Slide over 5.milli every 10.milli on 'rowtime as 'w)
> .groupBy('w, 'string)
> .select('string, 'int.count, 'w.start, 'w.end)
> val results = windowedTable.toDataStream[Row]
> results.addSink(new StreamITCase.StringSink)
> env.execute()
> val expected = Seq(
> "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
> "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
> "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
> assertEquals(expected.sorted, StreamITCase.testResults.sorted)
> }
> class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
> override def checkAndGetNextWatermark(
> lastElement: (Long, Int, String),
> extractedTimestamp: Long)
> : Watermark = {
> new Watermark(extractedTimestamp)
> }
> override def extractTimestamp(
> element: (Long, Int, String),
> previousElementTimestamp: Long): Long = {
> element._1
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)