You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Soumya Simanta <so...@gmail.com> on 2016/06/07 04:40:05 UTC

Window start and end issue with TumblingProcessingTimeWindows

I've a simple program which takes some inputs from a command line (Socket
stream) and then aggregates based on the key.

When running this program on my local machine I see some output that is
counter intuitive to my understanding of windows in Flink.

The start time of the Window is around the time the Functions are being
evaluated. However, *the window end time is around 60 s (window size) after
the current time (please see below). *

Can someone explain this behaviour please?

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class EventAgg(start: Long, end: Long, key: String, value: Int)

object Processor {

  val window_length = 60000 // milliseconds

  def aggregateEvents(key: String, window: TimeWindow, in:
Iterable[Event], out: Collector[EventAgg]): Unit = {
    var sum = 0
    for (e <- in) {
      sum = sum + e.value
    }
    val start = window.getStart
    val end = window.getEnd
    val diff = (end - start)
    println(s" windowId: ${window.hashCode()} currenttime:
${System.currentTimeMillis()} key:[$key] start: $start end: $end diff:
$diff")


    out.collect(
      new EventAgg(
        start = window.getStart,
        end = window.getEnd,
        key = key,
        value = sum
      )
    )
  }

  def main(Args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

    val sevents = env.socketTextStream("localhost", 9000)
    sevents
      .map(x => parseEvent(x))
      .keyBy(_.key)
      .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
      .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
      .map("Default Assigner: " + System.currentTimeMillis().toString
+ " - " + _.toString)
      .print()

    env.execute("Event time windows")
  }

  def parseEvent(s: String): Event = {
    if (s == null || s.trim().length == 0)
      Event("default", 0, 0L)
    else {
      val parts = s.split(",")
      Event(parts(0), parts(1).toInt, 1L)
    }
  }
}


*Output*

 windowId: -663519360 currenttime: 1465234200007 key:[a] start:
1465234200000 end: 1465234260000 diff: 60000
 windowId: -663519360 currenttime: 1465234200006 key:[b] start:
1465234200000 end: 1465234260000 diff: 60000
3> Default Assigner: 1465234200010 -
EventAgg(1465234200000,1465234260000,a,3)
7> Default Assigner: 1465234200010 -
EventAgg(1465234200000,1465234260000,b,4)

Re: Window start and end issue with TumblingProcessingTimeWindows

Posted by Soumya Simanta <so...@gmail.com>.
Thanks for the clarification.

On Tue, Jun 7, 2016 at 9:15 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> I'm afraid you're running into a bug into the special processing-time
> window operator. A suggested workaround would be to switch to
> characteristic IngestionTime and use TumblingEventTimeWindows.
>
> I also open a Jira issue for the bug so that we can keep track of it:
> https://issues.apache.org/jira/browse/FLINK-4028
>
> Cheers,
> Aljoscha
>
> On Tue, 7 Jun 2016 at 14:57 Soumya Simanta <so...@gmail.com>
> wrote:
>
>> The problem is why is the window end time in the future ?
>>
>> For example if my window size is 60 seconds and my window is being
>> evaluated at 3.00 pm then why is the window end time 3.01 pm and not 3.00
>> pm even when the data that is being evaluated falls in the window 2.59 -
>> 3.00.
>>
>> Sent from my iPhone
>>
>> On Jun 7, 2016, at 3:47 PM, Chesnay Schepler <ch...@apache.org> wrote:
>>
>> could you state a specific problem?
>>
>> On 07.06.2016 06:40, Soumya Simanta wrote:
>>
>> I've a simple program which takes some inputs from a command line (Socket
>> stream) and then aggregates based on the key.
>>
>> When running this program on my local machine I see some output that is
>> counter intuitive to my understanding of windows in Flink.
>>
>> The start time of the Window is around the time the Functions are being
>> evaluated. However, *the window end time is around 60 s (window size)
>> after the current time (please see below). *
>>
>> Can someone explain this behaviour please?
>>
>> import org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindowsimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collector
>> case class EventAgg(start: Long, end: Long, key: String, value: Int)
>> object Processor {
>>
>>   val window_length = 60000 // milliseconds  def aggregateEvents(key: String, window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): Unit = {
>>     var sum = 0    for (e <- in) {
>>       sum = sum + e.value
>>     }
>>     val start = window.getStart
>>     val end = window.getEnd
>>     val diff = (end - start)
>>     println(s" windowId: ${window.hashCode()} currenttime: ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: $diff")
>>
>>
>>     out.collect(
>>       new EventAgg(
>>         start = window.getStart,
>>         end = window.getEnd,
>>         key = key,
>>         value = sum
>>       )
>>     )
>>   }
>>
>>   def main(Args: Array[String]): Unit = {
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>     //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)    val sevents = env.socketTextStream("localhost", 9000)
>>     sevents
>>       .map(x => parseEvent(x))
>>       .keyBy(_.key)
>>       .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
>>       .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
>>       .map("Default Assigner: " + System.currentTimeMillis().toString + " - " + _.toString)
>>       .print()
>>
>>     env.execute("Event time windows")
>>   }
>>
>>   def parseEvent(s: String): Event = {
>>     if (s == null || s.trim().length == 0)
>>       Event("default", 0, 0L)
>>     else {
>>       val parts = s.split(",")
>>       Event(parts(0), parts(1).toInt, 1L)
>>     }
>>   }
>> }
>>
>>
>> *Output*
>>
>>  windowId: -663519360 currenttime: 1465234200007 key:[a] start:
>> 1465234200000 end: 1465234260000 diff: 60000
>>  windowId: -663519360 currenttime: 1465234200006 key:[b] start:
>> 1465234200000 end: 1465234260000 diff: 60000
>> 3> Default Assigner: 1465234200010 -
>> EventAgg(1465234200000,1465234260000,a,3)
>> 7> Default Assigner: 1465234200010 -
>> EventAgg(1465234200000,1465234260000,b,4)
>>
>>
>>
>>

Re: Window start and end issue with TumblingProcessingTimeWindows

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I'm afraid you're running into a bug into the special processing-time
window operator. A suggested workaround would be to switch to
characteristic IngestionTime and use TumblingEventTimeWindows.

I also open a Jira issue for the bug so that we can keep track of it:
https://issues.apache.org/jira/browse/FLINK-4028

Cheers,
Aljoscha

On Tue, 7 Jun 2016 at 14:57 Soumya Simanta <so...@gmail.com> wrote:

> The problem is why is the window end time in the future ?
>
> For example if my window size is 60 seconds and my window is being
> evaluated at 3.00 pm then why is the window end time 3.01 pm and not 3.00
> pm even when the data that is being evaluated falls in the window 2.59 -
> 3.00.
>
> Sent from my iPhone
>
> On Jun 7, 2016, at 3:47 PM, Chesnay Schepler <ch...@apache.org> wrote:
>
> could you state a specific problem?
>
> On 07.06.2016 06:40, Soumya Simanta wrote:
>
> I've a simple program which takes some inputs from a command line (Socket
> stream) and then aggregates based on the key.
>
> When running this program on my local machine I see some output that is
> counter intuitive to my understanding of windows in Flink.
>
> The start time of the Window is around the time the Functions are being
> evaluated. However, *the window end time is around 60 s (window size)
> after the current time (please see below). *
>
> Can someone explain this behaviour please?
>
> import org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindowsimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collector
> case class EventAgg(start: Long, end: Long, key: String, value: Int)
> object Processor {
>
>   val window_length = 60000 // milliseconds  def aggregateEvents(key: String, window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): Unit = {
>     var sum = 0    for (e <- in) {
>       sum = sum + e.value
>     }
>     val start = window.getStart
>     val end = window.getEnd
>     val diff = (end - start)
>     println(s" windowId: ${window.hashCode()} currenttime: ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: $diff")
>
>
>     out.collect(
>       new EventAgg(
>         start = window.getStart,
>         end = window.getEnd,
>         key = key,
>         value = sum
>       )
>     )
>   }
>
>   def main(Args: Array[String]): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>     //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)    val sevents = env.socketTextStream("localhost", 9000)
>     sevents
>       .map(x => parseEvent(x))
>       .keyBy(_.key)
>       .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
>       .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
>       .map("Default Assigner: " + System.currentTimeMillis().toString + " - " + _.toString)
>       .print()
>
>     env.execute("Event time windows")
>   }
>
>   def parseEvent(s: String): Event = {
>     if (s == null || s.trim().length == 0)
>       Event("default", 0, 0L)
>     else {
>       val parts = s.split(",")
>       Event(parts(0), parts(1).toInt, 1L)
>     }
>   }
> }
>
>
> *Output*
>
>  windowId: -663519360 currenttime: 1465234200007 key:[a] start:
> 1465234200000 end: 1465234260000 diff: 60000
>  windowId: -663519360 currenttime: 1465234200006 key:[b] start:
> 1465234200000 end: 1465234260000 diff: 60000
> 3> Default Assigner: 1465234200010 -
> EventAgg(1465234200000,1465234260000,a,3)
> 7> Default Assigner: 1465234200010 -
> EventAgg(1465234200000,1465234260000,b,4)
>
>
>
>

Re: Window start and end issue with TumblingProcessingTimeWindows

Posted by Soumya Simanta <so...@gmail.com>.
The problem is why is the window end time in the future ? 

For example if my window size is 60 seconds and my window is being evaluated at 3.00 pm then why is the window end time 3.01 pm and not 3.00 pm even when the data that is being evaluated falls in the window 2.59 - 3.00. 

Sent from my iPhone

> On Jun 7, 2016, at 3:47 PM, Chesnay Schepler <ch...@apache.org> wrote:
> 
> could you state a specific problem?
> 
>> On 07.06.2016 06:40, Soumya Simanta wrote:
>> I've a simple program which takes some inputs from a command line (Socket stream) and then aggregates based on the key. 
>> 
>> When running this program on my local machine I see some output that is counter intuitive to my understanding of windows in Flink. 
>> 
>> The start time of the Window is around the time the Functions are being evaluated. However, the window end time is around 60 s (window size) after the current time (please see below).  
>> 
>> Can someone explain this behaviour please? 
>> import org.apache.flink.api.scala._
>> import org.apache.flink.streaming.api.TimeCharacteristic
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
>> import org.apache.flink.util.Collector
>> 
>> case class EventAgg(start: Long, end: Long, key: String, value: Int)
>> 
>> object Processor {
>> 
>>   val window_length = 60000 // milliseconds
>> 
>>   def aggregateEvents(key: String, window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): Unit = {
>>     var sum = 0
>>     for (e <- in) {
>>       sum = sum + e.value
>>     }
>>     val start = window.getStart
>>     val end = window.getEnd
>>     val diff = (end - start)
>>     println(s" windowId: ${window.hashCode()} currenttime: ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: $diff")
>> 
>> 
>>     out.collect(
>>       new EventAgg(
>>         start = window.getStart,
>>         end = window.getEnd,
>>         key = key,
>>         value = sum
>>       )
>>     )
>>   }
>> 
>>   def main(Args: Array[String]): Unit = {
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>     //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>> 
>>     val sevents = env.socketTextStream("localhost", 9000)
>>     sevents
>>       .map(x => parseEvent(x))
>>       .keyBy(_.key)
>>       .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
>>       .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
>>       .map("Default Assigner: " + System.currentTimeMillis().toString + " - " + _.toString)
>>       .print()
>> 
>>     env.execute("Event time windows")
>>   }
>> 
>>   def parseEvent(s: String): Event = {
>>     if (s == null || s.trim().length == 0)
>>       Event("default", 0, 0L)
>>     else {
>>       val parts = s.split(",")
>>       Event(parts(0), parts(1).toInt, 1L)
>>     }
>>   }
>> }
>> 
>> Output
>> 
>>  windowId: -663519360 currenttime: 1465234200007 key:[a] start: 1465234200000 end: 1465234260000 diff: 60000
>>  windowId: -663519360 currenttime: 1465234200006 key:[b] start: 1465234200000 end: 1465234260000 diff: 60000
>> 3> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,a,3)
>> 7> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,b,4)
>> 
>> 
> 

Re: Window start and end issue with TumblingProcessingTimeWindows

Posted by Chesnay Schepler <ch...@apache.org>.
could you state a specific problem?

On 07.06.2016 06:40, Soumya Simanta wrote:
> I've a simple program which takes some inputs from a command line 
> (Socket stream) and then aggregates based on the key.
>
> When running this program on my local machine I see some output that 
> is counter intuitive to my understanding of windows in Flink.
>
> The start time of the Window is around the time the Functions are 
> being evaluated. However, *the window end time is around 60 s (window 
> size) after the current time (please see below). *
>
> Can someone explain this behaviour please?
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.util.Collector
>
> case class EventAgg(start: Long, end: Long, key:String, value: Int)
>
> object Processor {
>
>    val window_length =60000 // milliseconds def aggregateEvents(key:String, window: TimeWindow, in:Iterable[Event], out: Collector[EventAgg]): Unit = {
>      var sum =0 for (e <- in) {
>        sum = sum + e.value
>      }
>      val start = window.getStart
>      val end = window.getEnd
>      val diff = (end - start)
>      println(s" windowId: ${window.hashCode()}currenttime: ${System.currentTimeMillis()}key:[$key] start: $startend: $enddiff: $diff")
>
>
>      out.collect(
>        new EventAgg(
>          start = window.getStart,
>          end = window.getEnd,
>          key = key,
>          value = sum
>        )
>      )
>    }
>
>    def main(Args: Array[String]): Unit = {
>      val env = StreamExecutionEnvironment.getExecutionEnvironment 
> //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>      //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val sevents = env.socketTextStream("localhost",9000)
>      sevents
>        .map(x =>parseEvent(x))
>        .keyBy(_.key)
>        .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
>        .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
>        .map("Default Assigner: " + System.currentTimeMillis().toString +" - " + _.toString)
>        .print()
>
>      env.execute("Event time windows")
>    }
>
>    def parseEvent(s:String): Event = {
>      if (s ==null || s.trim().length ==0)
>        Event("default",0,0L)
>      else {
>        val parts = s.split(",")
>        Event(parts(0), parts(1).toInt,1L)
>      }
>    }
> }
>
> *_Output_*
>
>  windowId: -663519360 currenttime: 1465234200007 key:[a] start: 
> 1465234200000 end: 1465234260000 diff: 60000
>  windowId: -663519360 currenttime: 1465234200006 key:[b] start: 
> 1465234200000 end: 1465234260000 diff: 60000
> 3> Default Assigner: 1465234200010 - 
> EventAgg(1465234200000,1465234260000,a,3)
> 7> Default Assigner: 1465234200010 - 
> EventAgg(1465234200000,1465234260000,b,4)
>
>