You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2017/02/07 15:35:41 UTC
[jira] [Created] (FLINK-5735) Non-overlapping sliding window is not
deterministic
Timo Walther created FLINK-5735:
-----------------------------------
Summary: Non-overlapping sliding window is not deterministic
Key: FLINK-5735
URL: https://issues.apache.org/jira/browse/FLINK-5735
Project: Flink
Issue Type: Bug
Components: Table API & SQL
Reporter: Timo Walther
I don't know if this is a problem of the Table API or the underlying API. We have to investigate this as part of the issue.
The following code leads to different results from time to time. Sometimes the count of "Hello" is 1 sometimes 2.
{code}
val data = List(
(1L, 1, "Hi"),
(2L, 2, "Hallo"),
(3L, 2, "Hello"),
(6L, 3, "Hello"),
(4L, 5, "Hello"),
(16L, 4, "Hello world"),
(8L, 3, "Hello world"))
@Test
def testEventTimeSlidingWindowNonOverlapping(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
val stream = env
.fromCollection(data)
.assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
val table = stream.toTable(tEnv, 'long, 'int, 'string)
val windowedTable = table
.window(Slide over 5.milli every 10.milli on 'rowtime as 'w)
.groupBy('w, 'string)
.select('string, 'int.count, 'w.start, 'w.end)
val results = windowedTable.toDataStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
val expected = Seq(
"Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
"Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
"Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
override def checkAndGetNextWatermark(
lastElement: (Long, Int, String),
extractedTimestamp: Long)
: Watermark = {
new Watermark(extractedTimestamp)
}
override def extractTimestamp(
element: (Long, Int, String),
previousElementTimestamp: Long): Long = {
element._1
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)