You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Cody (JIRA)" <ji...@apache.org> on 2016/07/14 08:52:20 UTC

[jira] [Closed] (FLINK-4215) timestamp of StreamRecord is lost in WindowOperator

     [ https://issues.apache.org/jira/browse/FLINK-4215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Cody closed FLINK-4215.
-----------------------
    Resolution: Invalid

> timestamp of StreamRecord is lost in WindowOperator
> ---------------------------------------------------
>
>                 Key: FLINK-4215
>                 URL: https://issues.apache.org/jira/browse/FLINK-4215
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.0.3
>            Reporter: Cody
>
> In a WindowedStream, if the subsequent operator is a WindowOperator(by applying a fold function), the timestamp of StreamRecord will be lost. Here's my test code:
> ---------------------------------------------
>   def getSmall4TupleDataStreamWithTime(env: StreamExecutionEnvironment):
>   DataStream[(Int, Long, String, String)] = {
>     val data = new mutable.MutableList[(Int, Long, String, String)]
>     data.+=((1, 1L, "Hi", "2016-07-06 14:00:00"))
>     data.+=((2, 2L, "Hello", "2016-07-06 14:01:00"))
>     data.+=((3, 2L, "Hello world", "2016-07-06 14:02:00"))
>     env.fromCollection(data)
>   }
>   @Test
>   def testTimestampInWindowOperator(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val stream = StreamTestData.getSmall4TupleDataStreamWithTime(env).assignTimestampsAndWatermarks(
>     new AssignerWithPeriodicWatermarks[(Int, Long, String, String)] {
>       override def getCurrentWatermark: Watermark = null
>       override def extractTimestamp(element: (Int, Long, String, String),
>                                     previousElementTimestamp: Long): Long = {
>         DateFormat.getDateTimeInstance.parse(element._4).getTime
>       }
>     }).keyBy(3).timeWindow(Time.milliseconds(1000))
>       .fold((0, 0L, "", ""),
>         new FoldFunction[(Int, Long, String, String), (Int, Long, String, String)] {
>         override def fold(v1: (Int, Long, String, String), v2: (Int, Long, String, String))
>         : (Int, Long, String, String) = {
>           (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3, v1._4 + v2._4)
>         }
>     }).addSink(new PrintSinkFunction[(Int, Long, String, String)]())
>     env.execute()
>   }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)