You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2017/02/07 15:35:41 UTC

[jira] [Created] (FLINK-5735) Non-overlapping sliding window is not deterministic

Timo Walther created FLINK-5735:
-----------------------------------

             Summary: Non-overlapping sliding window is not deterministic
                 Key: FLINK-5735
                 URL: https://issues.apache.org/jira/browse/FLINK-5735
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
            Reporter: Timo Walther


I don't know if this is a problem of the Table API or the underlying API. We have to investigate this as part of the issue.

The following code leads to different results from time to time. Sometimes the count of "Hello" is 1 sometimes 2.

{code}
  val data = List(
    (1L, 1, "Hi"),
    (2L, 2, "Hallo"),
    (3L, 2, "Hello"),
    (6L, 3, "Hello"),
    (4L, 5, "Hello"),
    (16L, 4, "Hello world"),
    (8L, 3, "Hello world"))

  @Test
  def testEventTimeSlidingWindowNonOverlapping(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val tEnv = TableEnvironment.getTableEnvironment(env)
    StreamITCase.testResults = mutable.MutableList()

    val stream = env
      .fromCollection(data)
      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
    val table = stream.toTable(tEnv, 'long, 'int, 'string)

    val windowedTable = table
      .window(Slide over 5.milli every 10.milli on 'rowtime as 'w)
      .groupBy('w, 'string)
      .select('string, 'int.count, 'w.start, 'w.end)

    val results = windowedTable.toDataStream[Row]
    results.addSink(new StreamITCase.StringSink)
    env.execute()

    val expected = Seq(
      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
  }

  class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {

    override def checkAndGetNextWatermark(
        lastElement: (Long, Int, String),
        extractedTimestamp: Long)
      : Watermark = {
      new Watermark(extractedTimestamp)
    }

    override def extractTimestamp(
        element: (Long, Int, String),
        previousElementTimestamp: Long): Long = {
      element._1
    }
  }
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)