You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Cody (JIRA)" <ji...@apache.org> on 2016/07/14 08:20:20 UTC

[jira] [Created] (FLINK-4215) timestamp of StreamRecord is lost in WindowOperator

Cody created FLINK-4215:
---------------------------

             Summary: timestamp of StreamRecord is lost in WindowOperator
                 Key: FLINK-4215
                 URL: https://issues.apache.org/jira/browse/FLINK-4215
             Project: Flink
          Issue Type: Bug
          Components: DataStream API
    Affects Versions: 1.0.3
            Reporter: Cody


In a WindowedStream, if the subsequent operator is a WindowOperator(by applying a fold function), the timestamp of StreamRecord will be lost. Here's my test code:
---------------------------------------------
  def getSmall4TupleDataStreamWithTime(env: StreamExecutionEnvironment):
  DataStream[(Int, Long, String, String)] = {
    val data = new mutable.MutableList[(Int, Long, String, String)]
    data.+=((1, 1L, "Hi", "2016-07-06 14:00:00"))
    data.+=((2, 2L, "Hello", "2016-07-06 14:01:00"))
    data.+=((3, 2L, "Hello world", "2016-07-06 14:02:00"))
    env.fromCollection(data)
  }

  @Test
  def testTimestampInWindowOperator(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = StreamTestData.getSmall4TupleDataStreamWithTime(env).assignTimestampsAndWatermarks(
    new AssignerWithPeriodicWatermarks[(Int, Long, String, String)] {
      override def getCurrentWatermark: Watermark = null

      override def extractTimestamp(element: (Int, Long, String, String),
                                    previousElementTimestamp: Long): Long = {
        DateFormat.getDateTimeInstance.parse(element._4).getTime
      }
    }).keyBy(3).timeWindow(Time.milliseconds(1000))
      .fold((0, 0L, "", ""),
        new FoldFunction[(Int, Long, String, String), (Int, Long, String, String)] {
        override def fold(v1: (Int, Long, String, String), v2: (Int, Long, String, String))
        : (Int, Long, String, String) = {
          (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3, v1._4 + v2._4)
        }
    }).addSink(new PrintSinkFunction[(Int, Long, String, String)]())

    env.execute()
  }




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)