You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 刘 文 <th...@yahoo.com> on 2019/03/06 14:51:14 UTC

DataStream EventTime last data cannot be output?

DataStream EventTime last data cannot be output ?


In the verification of EventTime plus watermark processing, I found that the data sent to the socket cannot be output in time or output.
). The verification found that only the timestamp of the current send data of getCurrentWatermark() > TimeWindow + maxOutOfOrderness will trigger the end of the last window
). But the latest record can not be processed in time, or can not be processed
). How can I deal with this problem?



The following is the Flink program ,Flink 1.7.2
---------------------------------------------------------------------------



package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime

import java.util.{Date, Properties}

import com.alibaba.fastjson.JSON
import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector


object SockWordCountRun {



  def main(args: Array[String]): Unit = {


    // get the execution environment
   // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val configuration : Configuration = ConfigurationUtil.getConfiguration(true)

    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)


    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)



    import org.apache.flink.streaming.api.scala._
    val dataStream = env.socketTextStream("localhost", 1234, '\n')

     // .setParallelism(3)


    dataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {

        val maxOutOfOrderness =  2 * 1000L // 3.5 seconds
        var currentMaxTimestamp: Long = _
        var currentTimestamp: Long = _

        override def getCurrentWatermark: Watermark =  new Watermark(currentMaxTimestamp - maxOutOfOrderness)

        override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
          val jsonObject = JSON.parseObject(element)

          val timestamp = jsonObject.getLongValue("extract_data_time")
          currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
          currentTimestamp = timestamp

        /*  println("===========watermark begin===========")
          println()
          println(new Date(currentMaxTimestamp - 20 * 1000))
          println(jsonObject)
          println("===========watermark end===========")
          println()*/
          timestamp
        }

      })
      .timeWindowAll(Time.seconds(3))

      .process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
      override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {


        println()
        println("开始提交window")
        println(new Date())
        for(e <- elements) out.collect(e)
        println("结束提交window")
        println(new Date())
        println()
      }
    })

      .print()
      //.setParallelism(3)





    println("==================================以下为执行计划==================================")
    println("执行地址(firefox效果更好):https://flink.apache.org/visualizer")
    //执行计划
    println(env.getStreamGraph.getStreamingPlanAsJSON)
    println("==================================以上为执行计划 JSON串==================================\n")


    env.execute("Socket 水印作业")






    println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long){
    //override def toString: String = Thread.currentThread().getName + word + " : " + count
  }


  def getConfiguration(isDebug:Boolean = false):Configuration = {

    val configuration : Configuration = new Configuration()

    if(isDebug){
      val timeout = "100000 s"
      val timeoutHeartbeatPause = "1000000 s"
      configuration.setString("akka.ask.timeout",timeout)
      configuration.setString("akka.lookup.timeout",timeout)
      configuration.setString("akka.tcp.timeout",timeout)
      configuration.setString("akka.transport.heartbeat.interval",timeout)
      configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
      configuration.setString("akka.watch.heartbeat.pause",timeout)
      configuration.setInteger("heartbeat.interval",10000000)
      configuration.setInteger("heartbeat.timeout",50000000)
    }


    configuration
  }


}





best   thinktothings


Fwd: DataStream EventTime last data cannot be output?

Posted by 刘 文 <th...@yahoo.com.INVALID>.

> 下面是被转发的邮件:
> 
> 发件人: 刘 文 <th...@yahoo.com>
> 主题: DataStream EventTime last data cannot be output?
> 日期: 2019年3月6日 GMT+8 下午10:51:14
> 收件人: user@flink.apache.org
> 
> DataStream EventTime last data cannot be output ?
> 
> 
> In the verification of EventTime plus watermark processing, I found that the data sent to the socket cannot be output in time or output.
> ). The verification found that only the timestamp of the current send data of getCurrentWatermark() > TimeWindow + maxOutOfOrderness will trigger the end of the last window
> ). But the latest record can not be processed in time, or can not be processed
> ). How can I deal with this problem?
> 
> 
> 
> The following is the Flink program ,Flink 1.7.2
> ---------------------------------------------------------------------------
> 
> 
> 
> package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime
> 
> import java.util.{Date, Properties}
> 
> import com.alibaba.fastjson.JSON
> import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.util.Collector
> 
> 
> object SockWordCountRun {
> 
> 
> 
>   def main(args: Array[String]): Unit = {
> 
> 
>     // get the execution environment
>    // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
> 
> 
>     val configuration : Configuration = ConfigurationUtil.getConfiguration(true)
> 
>     val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
> 
> 
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
> 
> 
>     import org.apache.flink.streaming.api.scala._
>     val dataStream = env.socketTextStream("localhost", 1234, '\n')
> 
>      // .setParallelism(3)
> 
> 
>     dataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {
> 
>         val maxOutOfOrderness =  2 * 1000L // 3.5 seconds
>         var currentMaxTimestamp: Long = _
>         var currentTimestamp: Long = _
> 
>         override def getCurrentWatermark: Watermark =  new Watermark(currentMaxTimestamp - maxOutOfOrderness)
> 
>         override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
>           val jsonObject = JSON.parseObject(element)
> 
>           val timestamp = jsonObject.getLongValue("extract_data_time")
>           currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
>           currentTimestamp = timestamp
> 
>         /*  println("===========watermark begin===========")
>           println()
>           println(new Date(currentMaxTimestamp - 20 * 1000))
>           println(jsonObject)
>           println("===========watermark end===========")
>           println()*/
>           timestamp
>         }
> 
>       })
>       .timeWindowAll(Time.seconds(3))
> 
>       .process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
>       override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
> 
> 
>         println()
>         println("开始提交window")
>         println(new Date())
>         for(e <- elements) out.collect(e)
>         println("结束提交window")
>         println(new Date())
>         println()
>       }
>     })
> 
>       .print()
>       //.setParallelism(3)
> 
> 
> 
> 
> 
>     println("==================================以下为执行计划==================================")
>     println("执行地址(firefox效果更好):https://flink.apache.org/visualizer <https://flink.apache.org/visualizer>")
>     //执行计划
>     println(env.getStreamGraph.getStreamingPlanAsJSON)
>     println("==================================以上为执行计划 JSON串==================================\n")
> 
> 
>     env.execute("Socket 水印作业")
> 
> 
> 
> 
> 
> 
>     println("结束")
> 
>   }
> 
> 
>   // Data type for words with count
>   case class WordWithCount(word: String, count: Long){
>     //override def toString: String = Thread.currentThread().getName + word + " : " + count
>   }
> 
> 
>   def getConfiguration(isDebug:Boolean = false):Configuration = {
> 
>     val configuration : Configuration = new Configuration()
> 
>     if(isDebug){
>       val timeout = "100000 s"
>       val timeoutHeartbeatPause = "1000000 s"
>       configuration.setString("akka.ask.timeout",timeout)
>       configuration.setString("akka.lookup.timeout",timeout)
>       configuration.setString("akka.tcp.timeout",timeout)
>       configuration.setString("akka.transport.heartbeat.interval",timeout)
>       configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
>       configuration.setString("akka.watch.heartbeat.pause",timeout)
>       configuration.setInteger("heartbeat.interval",10000000)
>       configuration.setInteger("heartbeat.timeout",50000000)
>     }
> 
> 
>     configuration
>   }
> 
> 
> }
> 
> 
> 
> 
> 
> best   thinktothings
> 


Re: DataStream EventTime last data cannot be output?

Posted by Stephen Connolly <st...@gmail.com>.
I had this issue myself.

Your timestamp assigner will only advance the window as it receives data,
thus when you reach the end of the data there will be data which is newer
than the last window.

One solution is to have the source flag that there will be no more data. If
you can do this then that is the best solution.

Another solution is to mix event time and wall clock time in deciding the
window, thus the window will eventually move past and output the data. Note
that if you use this approach and you are reprocessing the data, because
the wall clock will be different, your data may be grouped differently and
you could see different results depending on what kind of computation you
are using.

The next gotcha that I hit was parallelism, if you are assigning timestamps
in a parallel task (say after a keyBy) then each of the parallel tasks will
have their own window assigner. If your data is poorly distributed for your
key function then you might end up with one of those parallel timestamp
assigners only getting one or zero data points and thus all data output is
blocked forever!

This is all hinted at on
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
but it could be more explicit.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#idling-sources
is describing your exact issue...

HTH

On Wed, 6 Mar 2019 at 14:51, 刘 文 <th...@yahoo.com> wrote:

> DataStream EventTime last data cannot be output ?
>
>
> In the verification of EventTime plus watermark processing, I found that
> the data sent to the socket cannot be output in time or output.
> ). The verification found that only the timestamp of the current send data
> of getCurrentWatermark() > TimeWindow + maxOutOfOrderness will trigger the
> end of the last window
> ). But the latest record can not be processed in time, or can not be
> processed
> ). How can I deal with this problem?
>
>
>
> The following is the Flink program ,Flink 1.7.2
> ---------------------------------------------------------------------------
>
>
>
> package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime
>
> import java.util.{Date, Properties}
>
> import com.alibaba.fastjson.JSON
> import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.util.Collector
>
>
> object SockWordCountRun {
>
>
>
>   def main(args: Array[String]): Unit = {
>
>
>     // get the execution environment
>    // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
>
>
>     val configuration : Configuration = ConfigurationUtil.getConfiguration(true)
>
>     val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
>
>
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>
>
>     import org.apache.flink.streaming.api.scala._
>     val dataStream = env.socketTextStream("localhost", 1234, '\n')
>
>      // .setParallelism(3)
>
>
>     dataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {
>
>         val maxOutOfOrderness =  2 * 1000L // 3.5 seconds
>         var currentMaxTimestamp: Long = _
>         var currentTimestamp: Long = _
>
>         override def getCurrentWatermark: Watermark =  new Watermark(currentMaxTimestamp - maxOutOfOrderness)
>
>         override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
>           val jsonObject = JSON.parseObject(element)
>
>           val timestamp = jsonObject.getLongValue("extract_data_time")
>           currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
>           currentTimestamp = timestamp
>
>         /*  println("===========watermark begin===========")
>           println()
>           println(new Date(currentMaxTimestamp - 20 * 1000))
>           println(jsonObject)
>           println("===========watermark end===========")
>           println()*/
>           timestamp
>         }
>
>       })
>       .timeWindowAll(Time.seconds(3))
>
>       .process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
>       override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
>
>
>         println()
>         println("开始提交window")
>         println(new Date())
>         for(e <- elements) out.collect(e)
>         println("结束提交window")
>         println(new Date())
>         println()
>       }
>     })
>
>       .print()
>       //.setParallelism(3)
>
>
>
>
>
>     println("==================================以下为执行计划==================================")
>     println("执行地址(firefox效果更好):https://flink.apache.org/visualizer")
>     //执行计划
>     println(env.getStreamGraph.getStreamingPlanAsJSON)
>     println("==================================以上为执行计划 JSON串==================================\n")
>
>
>     env.execute("Socket 水印作业")
>
>
>
>
>
>
>     println("结束")
>
>   }
>
>
>   // Data type for words with count
>   case class WordWithCount(word: String, count: Long){
>     //override def toString: String = Thread.currentThread().getName + word + " : " + count
>   }
>
>
>   def getConfiguration(isDebug:Boolean = false):Configuration = {
>
>     val configuration : Configuration = new Configuration()
>
>     if(isDebug){
>       val timeout = "100000 s"
>       val timeoutHeartbeatPause = "1000000 s"
>       configuration.setString("akka.ask.timeout",timeout)
>       configuration.setString("akka.lookup.timeout",timeout)
>       configuration.setString("akka.tcp.timeout",timeout)
>       configuration.setString("akka.transport.heartbeat.interval",timeout)
>       configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
>       configuration.setString("akka.watch.heartbeat.pause",timeout)
>       configuration.setInteger("heartbeat.interval",10000000)
>       configuration.setInteger("heartbeat.timeout",50000000)
>     }
>
>
>     configuration
>   }
>
>
> }
>
>
>
>
>
>
> best   thinktothings
>
>