You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shyla deshpande <de...@gmail.com> on 2018/08/05 03:24:07 UTC

Event Time Session Window does not trigger..

Hi,

I used PopularPlacesFromKafka from
dataartisans.flinktraining.exercises as the basis. I made very minor
changes

and the session window is not triggered. If I use ProcessingTime
instead of EventTime it works. Here is my code.

Appreciate any help. Thanks

object KafkaEventTimeWindow {

  private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
  private val LOCAL_KAFKA_BROKER = "localhost:9092"
  private val CON_GROUP = "KafkaEventTimeSessionWindow"
  private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 seconds

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val kafkaProps = new Properties
    kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
    kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
    kafkaProps.setProperty("group.id", CON_GROUP)
    kafkaProps.setProperty("auto.offset.reset", "earliest")

    val consumer = new FlinkKafkaConsumer011[PositionEventProto](
      "positionevent",
      new PositionEventProtoSchema,
      kafkaProps)
    consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner)

    val posstream = env.addSource(consumer)

    def convtoepochmilli(cdt: String): Long = {
      val  odt:OffsetDateTime = OffsetDateTime.parse(cdt);
      val i:Instant  = odt.toInstant();
      val millis:Long = i.toEpochMilli();
      millis
    }

    val outputstream = posstream
      .mapWith{case(p) => (p.getConsumerUserId,
convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
      .keyBy(0)
      .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
      .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }

    outputstream.print()

    // execute the transformation pipeline
    env.execute("Output Stream")
  }

}

class PositionEventProtoTSAssigner
  extends BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60))
{

  override def extractTimestamp(pos: PositionEventProto): Long = {
    val  odt:OffsetDateTime =
OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
    val i:Instant  = odt.toInstant();
    val millis:Long = i.toEpochMilli();
    millis
  }
}

Re: Event Time Session Window does not trigger..

Posted by shyla deshpande <de...@gmail.com>.
Hi Hequn and Fabian,
Thanks. Appreciate your help

On Mon, Aug 6, 2018 at 1:32 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> By setting the time characteristic to EventTime, you enable the internal
> handling of record timestamps and watermarks.
> In contrast to EventTime, ProcessingTime does not require any additional
> data.
>
> You can use both, EventTime and ProcessingTime in the same application and
> StreamExecutionEnvironment.
> However, if you enable EventTime, this will be the default mode in some
> API methods that create time-based operators and you will need to
> explicitly create ProcessingTime operators if you want to work in
> ProcessingTime.
> For example, the stream.keyBy().timeWindow(Time.minute(1)) shortcut,
> would create an EventTime Tumbling Window if the TimeCharacteristic is set
> to EventTime and a ProcessingTIme Tumbling Window if it is ProcessingTIme.
>
> Best,
> Fabian
>
> 2018-08-06 4:30 GMT+02:00 Hequn Cheng <ch...@gmail.com>:
>
>> Hi anna, shyla
>>
>> When we call setStreamTimeCharacteristic(env.setStreamTimeCharacteristic),
>> it means sets the time characteristic for all streams create from this
>> environment. So if your application contains multi environments, then yes.
>>
>> Best, Hequn
>>
>> On Mon, Aug 6, 2018 at 9:37 AM, shyla deshpande <deshpandeshyla@gmail.com
>> > wrote:
>>
>>> Hi Hequn,
>>>
>>> I now realize that in Production, data will not be a problem since this
>>> will be a high volume kafka topic.
>>> So, I will go with EventTime.
>>>
>>> Still, I would like to know if
>>>
>>> I can use both TimeCharacteristic.ProcessingTime  and TimeCharacteristic.EventTime in an application.
>>>
>>> *Thanks, the link you provided saved my time.*
>>>
>>> *-shyla*
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Aug 5, 2018 at 9:28 AM, anna stax <an...@gmail.com> wrote:
>>>
>>>> Hi Hequn,
>>>>
>>>> Thanks for link. Looks like I better use ProcessingTime instead of
>>>> EventTime especially because of the 4th reason you listed..
>>>> "Data should cover a longer time span than the window size to advance
>>>> the event time."
>>>> I need the trigger when the data stops.
>>>>
>>>> I have 1 more question.
>>>>
>>>> Can I set the TimeCharacteristic to the stream level instead on the application level?
>>>> Can I use both TimeCharacteristic.ProcessingTime  and TimeCharacteristic.EventTime in an application.
>>>>
>>>> Thank you
>>>>
>>>> On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng <ch...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi shyla,
>>>>>
>>>>> I answered a similar question on stackoverflow[1], you can take a look
>>>>> first.
>>>>>
>>>>> Best, Hequn
>>>>>
>>>>> [1] https://stackoverflow.com/questions/51691269/event-time-
>>>>> window-in-flink-does-not-trigger
>>>>>
>>>>> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <
>>>>> deshpandeshyla@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as the basis. I made very minor changes
>>>>>>
>>>>>> and the session window is not triggered. If I use ProcessingTime instead of EventTime it works. Here is my code.
>>>>>>
>>>>>> Appreciate any help. Thanks
>>>>>>
>>>>>> object KafkaEventTimeWindow {
>>>>>>
>>>>>>   private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
>>>>>>   private val LOCAL_KAFKA_BROKER = "localhost:9092"
>>>>>>   private val CON_GROUP = "KafkaEventTimeSessionWindow"
>>>>>>   private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 seconds
>>>>>>
>>>>>>   def main(args: Array[String]) {
>>>>>>
>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>
>>>>>>     val kafkaProps = new Properties
>>>>>>     kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
>>>>>>     kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
>>>>>>     kafkaProps.setProperty("group.id", CON_GROUP)
>>>>>>     kafkaProps.setProperty("auto.offset.reset", "earliest")
>>>>>>
>>>>>>     val consumer = new FlinkKafkaConsumer011[PositionEventProto](
>>>>>>       "positionevent",
>>>>>>       new PositionEventProtoSchema,
>>>>>>       kafkaProps)
>>>>>>     consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner)
>>>>>>
>>>>>>     val posstream = env.addSource(consumer)
>>>>>>
>>>>>>     def convtoepochmilli(cdt: String): Long = {
>>>>>>       val  odt:OffsetDateTime = OffsetDateTime.parse(cdt);
>>>>>>       val i:Instant  = odt.toInstant();
>>>>>>       val millis:Long = i.toEpochMilli();
>>>>>>       millis
>>>>>>     }
>>>>>>
>>>>>>     val outputstream = posstream
>>>>>>       .mapWith{case(p) => (p.getConsumerUserId, convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
>>>>>>       .keyBy(0)
>>>>>>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>>>>>>       .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }
>>>>>>
>>>>>>     outputstream.print()
>>>>>>
>>>>>>     // execute the transformation pipeline
>>>>>>     env.execute("Output Stream")
>>>>>>   }
>>>>>>
>>>>>> }
>>>>>>
>>>>>> class PositionEventProtoTSAssigner
>>>>>>   extends BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) {
>>>>>>
>>>>>>   override def extractTimestamp(pos: PositionEventProto): Long = {
>>>>>>     val  odt:OffsetDateTime = OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
>>>>>>     val i:Instant  = odt.toInstant();
>>>>>>     val millis:Long = i.toEpochMilli();
>>>>>>     millis
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Event Time Session Window does not trigger..

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

By setting the time characteristic to EventTime, you enable the internal
handling of record timestamps and watermarks.
In contrast to EventTime, ProcessingTime does not require any additional
data.

You can use both, EventTime and ProcessingTime in the same application and
StreamExecutionEnvironment.
However, if you enable EventTime, this will be the default mode in some API
methods that create time-based operators and you will need to explicitly
create ProcessingTime operators if you want to work in ProcessingTime.
For example, the stream.keyBy().timeWindow(Time.minute(1)) shortcut, would
create an EventTime Tumbling Window if the TimeCharacteristic is set to
EventTime and a ProcessingTIme Tumbling Window if it is ProcessingTIme.

Best,
Fabian

2018-08-06 4:30 GMT+02:00 Hequn Cheng <ch...@gmail.com>:

> Hi anna, shyla
>
> When we call setStreamTimeCharacteristic(env.setStreamTimeCharacteristic),
> it means sets the time characteristic for all streams create from this
> environment. So if your application contains multi environments, then yes.
>
> Best, Hequn
>
> On Mon, Aug 6, 2018 at 9:37 AM, shyla deshpande <de...@gmail.com>
> wrote:
>
>> Hi Hequn,
>>
>> I now realize that in Production, data will not be a problem since this
>> will be a high volume kafka topic.
>> So, I will go with EventTime.
>>
>> Still, I would like to know if
>>
>> I can use both TimeCharacteristic.ProcessingTime  and TimeCharacteristic.EventTime in an application.
>>
>> *Thanks, the link you provided saved my time.*
>>
>> *-shyla*
>>
>>
>>
>>
>>
>> On Sun, Aug 5, 2018 at 9:28 AM, anna stax <an...@gmail.com> wrote:
>>
>>> Hi Hequn,
>>>
>>> Thanks for link. Looks like I better use ProcessingTime instead of
>>> EventTime especially because of the 4th reason you listed..
>>> "Data should cover a longer time span than the window size to advance
>>> the event time."
>>> I need the trigger when the data stops.
>>>
>>> I have 1 more question.
>>>
>>> Can I set the TimeCharacteristic to the stream level instead on the application level?
>>> Can I use both TimeCharacteristic.ProcessingTime  and TimeCharacteristic.EventTime in an application.
>>>
>>> Thank you
>>>
>>> On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng <ch...@gmail.com>
>>> wrote:
>>>
>>>> Hi shyla,
>>>>
>>>> I answered a similar question on stackoverflow[1], you can take a look
>>>> first.
>>>>
>>>> Best, Hequn
>>>>
>>>> [1] https://stackoverflow.com/questions/51691269/event-time-
>>>> window-in-flink-does-not-trigger
>>>>
>>>> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <
>>>> deshpandeshyla@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as the basis. I made very minor changes
>>>>>
>>>>> and the session window is not triggered. If I use ProcessingTime instead of EventTime it works. Here is my code.
>>>>>
>>>>> Appreciate any help. Thanks
>>>>>
>>>>> object KafkaEventTimeWindow {
>>>>>
>>>>>   private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
>>>>>   private val LOCAL_KAFKA_BROKER = "localhost:9092"
>>>>>   private val CON_GROUP = "KafkaEventTimeSessionWindow"
>>>>>   private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 seconds
>>>>>
>>>>>   def main(args: Array[String]) {
>>>>>
>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>
>>>>>     val kafkaProps = new Properties
>>>>>     kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
>>>>>     kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
>>>>>     kafkaProps.setProperty("group.id", CON_GROUP)
>>>>>     kafkaProps.setProperty("auto.offset.reset", "earliest")
>>>>>
>>>>>     val consumer = new FlinkKafkaConsumer011[PositionEventProto](
>>>>>       "positionevent",
>>>>>       new PositionEventProtoSchema,
>>>>>       kafkaProps)
>>>>>     consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner)
>>>>>
>>>>>     val posstream = env.addSource(consumer)
>>>>>
>>>>>     def convtoepochmilli(cdt: String): Long = {
>>>>>       val  odt:OffsetDateTime = OffsetDateTime.parse(cdt);
>>>>>       val i:Instant  = odt.toInstant();
>>>>>       val millis:Long = i.toEpochMilli();
>>>>>       millis
>>>>>     }
>>>>>
>>>>>     val outputstream = posstream
>>>>>       .mapWith{case(p) => (p.getConsumerUserId, convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
>>>>>       .keyBy(0)
>>>>>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>>>>>       .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }
>>>>>
>>>>>     outputstream.print()
>>>>>
>>>>>     // execute the transformation pipeline
>>>>>     env.execute("Output Stream")
>>>>>   }
>>>>>
>>>>> }
>>>>>
>>>>> class PositionEventProtoTSAssigner
>>>>>   extends BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) {
>>>>>
>>>>>   override def extractTimestamp(pos: PositionEventProto): Long = {
>>>>>     val  odt:OffsetDateTime = OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
>>>>>     val i:Instant  = odt.toInstant();
>>>>>     val millis:Long = i.toEpochMilli();
>>>>>     millis
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Event Time Session Window does not trigger..

Posted by Hequn Cheng <ch...@gmail.com>.
Hi anna, shyla

When we call setStreamTimeCharacteristic(env.setStreamTimeCharacteristic),
it means sets the time characteristic for all streams create from this
environment. So if your application contains multi environments, then yes.

Best, Hequn

On Mon, Aug 6, 2018 at 9:37 AM, shyla deshpande <de...@gmail.com>
wrote:

> Hi Hequn,
>
> I now realize that in Production, data will not be a problem since this
> will be a high volume kafka topic.
> So, I will go with EventTime.
>
> Still, I would like to know if
>
> I can use both TimeCharacteristic.ProcessingTime  and TimeCharacteristic.EventTime in an application.
>
> *Thanks, the link you provided saved my time.*
>
> *-shyla*
>
>
>
>
>
> On Sun, Aug 5, 2018 at 9:28 AM, anna stax <an...@gmail.com> wrote:
>
>> Hi Hequn,
>>
>> Thanks for link. Looks like I better use ProcessingTime instead of
>> EventTime especially because of the 4th reason you listed..
>> "Data should cover a longer time span than the window size to advance the
>> event time."
>> I need the trigger when the data stops.
>>
>> I have 1 more question.
>>
>> Can I set the TimeCharacteristic to the stream level instead on the application level?
>> Can I use both TimeCharacteristic.ProcessingTime  and TimeCharacteristic.EventTime in an application.
>>
>> Thank you
>>
>> On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng <ch...@gmail.com>
>> wrote:
>>
>>> Hi shyla,
>>>
>>> I answered a similar question on stackoverflow[1], you can take a look
>>> first.
>>>
>>> Best, Hequn
>>>
>>> [1] https://stackoverflow.com/questions/51691269/event-time-
>>> window-in-flink-does-not-trigger
>>>
>>> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <
>>> deshpandeshyla@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as the basis. I made very minor changes
>>>>
>>>> and the session window is not triggered. If I use ProcessingTime instead of EventTime it works. Here is my code.
>>>>
>>>> Appreciate any help. Thanks
>>>>
>>>> object KafkaEventTimeWindow {
>>>>
>>>>   private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
>>>>   private val LOCAL_KAFKA_BROKER = "localhost:9092"
>>>>   private val CON_GROUP = "KafkaEventTimeSessionWindow"
>>>>   private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 seconds
>>>>
>>>>   def main(args: Array[String]) {
>>>>
>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>
>>>>     val kafkaProps = new Properties
>>>>     kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
>>>>     kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
>>>>     kafkaProps.setProperty("group.id", CON_GROUP)
>>>>     kafkaProps.setProperty("auto.offset.reset", "earliest")
>>>>
>>>>     val consumer = new FlinkKafkaConsumer011[PositionEventProto](
>>>>       "positionevent",
>>>>       new PositionEventProtoSchema,
>>>>       kafkaProps)
>>>>     consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner)
>>>>
>>>>     val posstream = env.addSource(consumer)
>>>>
>>>>     def convtoepochmilli(cdt: String): Long = {
>>>>       val  odt:OffsetDateTime = OffsetDateTime.parse(cdt);
>>>>       val i:Instant  = odt.toInstant();
>>>>       val millis:Long = i.toEpochMilli();
>>>>       millis
>>>>     }
>>>>
>>>>     val outputstream = posstream
>>>>       .mapWith{case(p) => (p.getConsumerUserId, convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
>>>>       .keyBy(0)
>>>>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>>>>       .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }
>>>>
>>>>     outputstream.print()
>>>>
>>>>     // execute the transformation pipeline
>>>>     env.execute("Output Stream")
>>>>   }
>>>>
>>>> }
>>>>
>>>> class PositionEventProtoTSAssigner
>>>>   extends BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) {
>>>>
>>>>   override def extractTimestamp(pos: PositionEventProto): Long = {
>>>>     val  odt:OffsetDateTime = OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
>>>>     val i:Instant  = odt.toInstant();
>>>>     val millis:Long = i.toEpochMilli();
>>>>     millis
>>>>   }
>>>> }
>>>>
>>>>
>>>>
>>>
>>
>

Re: Event Time Session Window does not trigger..

Posted by shyla deshpande <de...@gmail.com>.
Hi Hequn,

I now realize that in Production, data will not be a problem since this
will be a high volume kafka topic.
So, I will go with EventTime.

Still, I would like to know if

I can use both TimeCharacteristic.ProcessingTime  and
TimeCharacteristic.EventTime in an application.

*Thanks, the link you provided saved my time.*

*-shyla*





On Sun, Aug 5, 2018 at 9:28 AM, anna stax <an...@gmail.com> wrote:

> Hi Hequn,
>
> Thanks for link. Looks like I better use ProcessingTime instead of
> EventTime especially because of the 4th reason you listed..
> "Data should cover a longer time span than the window size to advance the
> event time."
> I need the trigger when the data stops.
>
> I have 1 more question.
>
> Can I set the TimeCharacteristic to the stream level instead on the application level?
> Can I use both TimeCharacteristic.ProcessingTime  and TimeCharacteristic.EventTime in an application.
>
> Thank you
>
> On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng <ch...@gmail.com> wrote:
>
>> Hi shyla,
>>
>> I answered a similar question on stackoverflow[1], you can take a look
>> first.
>>
>> Best, Hequn
>>
>> [1] https://stackoverflow.com/questions/51691269/event-time-
>> window-in-flink-does-not-trigger
>>
>> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <
>> deshpandeshyla@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as the basis. I made very minor changes
>>>
>>> and the session window is not triggered. If I use ProcessingTime instead of EventTime it works. Here is my code.
>>>
>>> Appreciate any help. Thanks
>>>
>>> object KafkaEventTimeWindow {
>>>
>>>   private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
>>>   private val LOCAL_KAFKA_BROKER = "localhost:9092"
>>>   private val CON_GROUP = "KafkaEventTimeSessionWindow"
>>>   private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 seconds
>>>
>>>   def main(args: Array[String]) {
>>>
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>
>>>     val kafkaProps = new Properties
>>>     kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
>>>     kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
>>>     kafkaProps.setProperty("group.id", CON_GROUP)
>>>     kafkaProps.setProperty("auto.offset.reset", "earliest")
>>>
>>>     val consumer = new FlinkKafkaConsumer011[PositionEventProto](
>>>       "positionevent",
>>>       new PositionEventProtoSchema,
>>>       kafkaProps)
>>>     consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner)
>>>
>>>     val posstream = env.addSource(consumer)
>>>
>>>     def convtoepochmilli(cdt: String): Long = {
>>>       val  odt:OffsetDateTime = OffsetDateTime.parse(cdt);
>>>       val i:Instant  = odt.toInstant();
>>>       val millis:Long = i.toEpochMilli();
>>>       millis
>>>     }
>>>
>>>     val outputstream = posstream
>>>       .mapWith{case(p) => (p.getConsumerUserId, convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
>>>       .keyBy(0)
>>>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>>>       .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }
>>>
>>>     outputstream.print()
>>>
>>>     // execute the transformation pipeline
>>>     env.execute("Output Stream")
>>>   }
>>>
>>> }
>>>
>>> class PositionEventProtoTSAssigner
>>>   extends BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) {
>>>
>>>   override def extractTimestamp(pos: PositionEventProto): Long = {
>>>     val  odt:OffsetDateTime = OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
>>>     val i:Instant  = odt.toInstant();
>>>     val millis:Long = i.toEpochMilli();
>>>     millis
>>>   }
>>> }
>>>
>>>
>>>
>>
>

Re: Event Time Session Window does not trigger..

Posted by anna stax <an...@gmail.com>.
Hi Hequn,

Thanks for link. Looks like I better use ProcessingTime instead of
EventTime especially because of the 4th reason you listed..
"Data should cover a longer time span than the window size to advance the
event time."
I need the trigger when the data stops.

I have 1 more question.

Can I set the TimeCharacteristic to the stream level instead on the
application level?
Can I use both TimeCharacteristic.ProcessingTime  and
TimeCharacteristic.EventTime in an application.

Thank you

On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng <ch...@gmail.com> wrote:

> Hi shyla,
>
> I answered a similar question on stackoverflow[1], you can take a look
> first.
>
> Best, Hequn
>
> [1] https://stackoverflow.com/questions/51691269/event-time-
> window-in-flink-does-not-trigger
>
> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <deshpandeshyla@gmail.com
> > wrote:
>
>> Hi,
>>
>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as the basis. I made very minor changes
>>
>> and the session window is not triggered. If I use ProcessingTime instead of EventTime it works. Here is my code.
>>
>> Appreciate any help. Thanks
>>
>> object KafkaEventTimeWindow {
>>
>>   private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
>>   private val LOCAL_KAFKA_BROKER = "localhost:9092"
>>   private val CON_GROUP = "KafkaEventTimeSessionWindow"
>>   private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 seconds
>>
>>   def main(args: Array[String]) {
>>
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>>     val kafkaProps = new Properties
>>     kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
>>     kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
>>     kafkaProps.setProperty("group.id", CON_GROUP)
>>     kafkaProps.setProperty("auto.offset.reset", "earliest")
>>
>>     val consumer = new FlinkKafkaConsumer011[PositionEventProto](
>>       "positionevent",
>>       new PositionEventProtoSchema,
>>       kafkaProps)
>>     consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner)
>>
>>     val posstream = env.addSource(consumer)
>>
>>     def convtoepochmilli(cdt: String): Long = {
>>       val  odt:OffsetDateTime = OffsetDateTime.parse(cdt);
>>       val i:Instant  = odt.toInstant();
>>       val millis:Long = i.toEpochMilli();
>>       millis
>>     }
>>
>>     val outputstream = posstream
>>       .mapWith{case(p) => (p.getConsumerUserId, convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
>>       .keyBy(0)
>>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>>       .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }
>>
>>     outputstream.print()
>>
>>     // execute the transformation pipeline
>>     env.execute("Output Stream")
>>   }
>>
>> }
>>
>> class PositionEventProtoTSAssigner
>>   extends BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) {
>>
>>   override def extractTimestamp(pos: PositionEventProto): Long = {
>>     val  odt:OffsetDateTime = OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
>>     val i:Instant  = odt.toInstant();
>>     val millis:Long = i.toEpochMilli();
>>     millis
>>   }
>> }
>>
>>
>>
>

Re: Event Time Session Window does not trigger..

Posted by Hequn Cheng <ch...@gmail.com>.
Hi shyla,

I answered a similar question on stackoverflow[1], you can take a look
first.

Best, Hequn

[1]
https://stackoverflow.com/questions/51691269/event-time-window-in-flink-does-not-trigger

On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <de...@gmail.com>
wrote:

> Hi,
>
> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as the basis. I made very minor changes
>
> and the session window is not triggered. If I use ProcessingTime instead of EventTime it works. Here is my code.
>
> Appreciate any help. Thanks
>
> object KafkaEventTimeWindow {
>
>   private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
>   private val LOCAL_KAFKA_BROKER = "localhost:9092"
>   private val CON_GROUP = "KafkaEventTimeSessionWindow"
>   private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 seconds
>
>   def main(args: Array[String]) {
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>     val kafkaProps = new Properties
>     kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
>     kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
>     kafkaProps.setProperty("group.id", CON_GROUP)
>     kafkaProps.setProperty("auto.offset.reset", "earliest")
>
>     val consumer = new FlinkKafkaConsumer011[PositionEventProto](
>       "positionevent",
>       new PositionEventProtoSchema,
>       kafkaProps)
>     consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner)
>
>     val posstream = env.addSource(consumer)
>
>     def convtoepochmilli(cdt: String): Long = {
>       val  odt:OffsetDateTime = OffsetDateTime.parse(cdt);
>       val i:Instant  = odt.toInstant();
>       val millis:Long = i.toEpochMilli();
>       millis
>     }
>
>     val outputstream = posstream
>       .mapWith{case(p) => (p.getConsumerUserId, convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
>       .keyBy(0)
>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>       .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }
>
>     outputstream.print()
>
>     // execute the transformation pipeline
>     env.execute("Output Stream")
>   }
>
> }
>
> class PositionEventProtoTSAssigner
>   extends BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) {
>
>   override def extractTimestamp(pos: PositionEventProto): Long = {
>     val  odt:OffsetDateTime = OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
>     val i:Instant  = odt.toInstant();
>     val millis:Long = i.toEpochMilli();
>     millis
>   }
> }
>
>
>