You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Cody (JIRA)" <ji...@apache.org> on 2016/07/14 08:52:20 UTC
[jira] [Closed] (FLINK-4215) timestamp of StreamRecord is lost in
WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-4215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Cody closed FLINK-4215.
-----------------------
Resolution: Invalid
> timestamp of StreamRecord is lost in WindowOperator
> ---------------------------------------------------
>
> Key: FLINK-4215
> URL: https://issues.apache.org/jira/browse/FLINK-4215
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.0.3
> Reporter: Cody
>
> In a WindowedStream, if the subsequent operator is a WindowOperator(by applying a fold function), the timestamp of StreamRecord will be lost. Here's my test code:
> ---------------------------------------------
> def getSmall4TupleDataStreamWithTime(env: StreamExecutionEnvironment):
> DataStream[(Int, Long, String, String)] = {
> val data = new mutable.MutableList[(Int, Long, String, String)]
> data.+=((1, 1L, "Hi", "2016-07-06 14:00:00"))
> data.+=((2, 2L, "Hello", "2016-07-06 14:01:00"))
> data.+=((3, 2L, "Hello world", "2016-07-06 14:02:00"))
> env.fromCollection(data)
> }
> @Test
> def testTimestampInWindowOperator(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = StreamTestData.getSmall4TupleDataStreamWithTime(env).assignTimestampsAndWatermarks(
> new AssignerWithPeriodicWatermarks[(Int, Long, String, String)] {
> override def getCurrentWatermark: Watermark = null
> override def extractTimestamp(element: (Int, Long, String, String),
> previousElementTimestamp: Long): Long = {
> DateFormat.getDateTimeInstance.parse(element._4).getTime
> }
> }).keyBy(3).timeWindow(Time.milliseconds(1000))
> .fold((0, 0L, "", ""),
> new FoldFunction[(Int, Long, String, String), (Int, Long, String, String)] {
> override def fold(v1: (Int, Long, String, String), v2: (Int, Long, String, String))
> : (Int, Long, String, String) = {
> (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3, v1._4 + v2._4)
> }
> }).addSink(new PrintSinkFunction[(Int, Long, String, String)]())
> env.execute()
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)