You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shashank agarwal <sh...@gmail.com> on 2018/01/03 13:29:13 UTC

Flink CEP with event time

Hello,

I have some patterns in my program. For an example,

 A followedBy B.

As I am using kafka source and my event API's using load balancers so
sometimes B comes before A. So my CEP doesn't generate any result for those
events.

I have then tried event time and applied
"BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time
from an origin time variable which I have in the event. I am using
watermark lateness of 10 seconds in that.

Now CEP stopped generating results. It's not even generating results where
Event B comes after A. I have tried within (10 seconds) in CEP also still
not generating results.

Am I doing anything wrong?

I have to cover the case where B can come after A from Kafka.

-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....




‌

Re: Flink CEP with event time

Posted by Aljoscha Krettek <al...@apache.org>.
What are the actual timestamps? If your BoundedOutOfOrderness extractor is lagging by 10 seconds then only seeing Event 1.B would not trigger execution. Only the later Event 2.A is sufficiently far ahead to trigger execution, which you actually get.

> On 3. Jan 2018, at 17:05, shashank agarwal <sh...@gmail.com> wrote:
> 
> Low Watermark is showing the same value which I am passing in event "1514994744412" for all the tasks related to that stream, (No watermark) is showing for Kafka source in UI.
> 
> So the pattern is following for CEP A followedBy B :
> 
> Event 1
> - I passed A with origTimestamp X. (Low watermark updated to X)  : No results  (this is right )
> - I passed B with origTimestamp X1. (Low watermark updated to X1)  : No results (results should be printed)
> 
> Event 2
> - I passed A with origTimestamp Y. (Low watermark updated to Y)  : Results of Event 1 printed  (this is wrong )
> - I passed B with origTimestamp Y1. (Low watermark updated to Y1)  : No results (results should be printed)
> 
> Event 3
> - I passed A with origTimestamp Z. (Low watermark updated to Z)  : Results of Event 2 printed  (this is wrong )
> - I passed B with origTimestamp Z1. (Low watermark updated to Z1)  : No results (results should be printed)
> 
> 
> 
> ‌
> 
> On Wed, Jan 3, 2018 at 8:30 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Can you please check what the input watermark of your operations is? There is a metric called "currentLowWatermark" for this.
> 
> Best,
> Aljoscha
> 
>> On 3. Jan 2018, at 15:54, shashank agarwal <shashank734@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Actually, In Kafka there are other topics also (around 5-6 topics) I am consuming particular topic 'x' which only contains events.  Other topics have different data. 
>> 
>> I am using two consumers in my program for 2 different topics. in first topic x i am extracting the timestamp from origintimestamp variable in other one i am using system current millis.
>> 
>> 
>> 
>> 
>> ‌
>> 
>> On Wed, Jan 3, 2018 at 8:06 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>> Ok, but will there be events in all Kafka partitions/topics?
>> 
>> 
>>> On 3. Jan 2018, at 15:33, shashank agarwal <shashank734@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> Yes, Events will always carry a variable OriginTimestamp which I am using in the extractor. I have used fallback also in case of data missing will put System current millis. 
>>> 
>>> Still, it's not printing results.
>>> 
>>> Best,
>>> Shashank
>>> 
>>> 
>>> 
>>> 
>>> 
>>> ‌
>>> 
>>> On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>>> Hi,
>>> 
>>> Are all the partitions always carrying data that has advancing timestamps? When using Event-time the Kafka source (and Flink in general) needs to have steady progress in all partitions, otherwise the watermark does not advance, which in turn means that processing will be stalled downstream.
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>> 
>>>> On 3. Jan 2018, at 14:29, shashank agarwal <shashank734@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hello,
>>>> 
>>>> I have some patterns in my program. For an example,
>>>> 
>>>>  A followedBy B.
>>>> 
>>>> As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 
>>>> 
>>>> I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time from an origin time variable which I have in the event. I am using watermark lateness of 10 seconds in that.
>>>> 
>>>> Now CEP stopped generating results. It's not even generating results where Event B comes after A. I have tried within (10 seconds) in CEP also still not generating results. 
>>>> 
>>>> Am I doing anything wrong?
>>>> 
>>>> I have to cover the case where B can come after A from Kafka.
>>>> 
>>>> -- 
>>>> Thanks Regards
>>>> 
>>>> SHASHANK AGARWAL
>>>>  ---  Trying to mobilize the things....
>>>> 
>>>> 
>>>> 
>>>> 
>>>> ‌
>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> Thanks Regards
>>> 
>>> SHASHANK AGARWAL
>>>  ---  Trying to mobilize the things....
>> 
>> 
>> 
>> 
>> -- 
>> Thanks Regards
>> 
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things....
>> 
> 
> 
> 
> 
> -- 
> Thanks Regards
> 
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....


Re: Flink CEP with event time

Posted by Aljoscha Krettek <al...@apache.org>.
Yes, because event-time only advances if something makes it advance. Basically.

> On 4. Jan 2018, at 11:34, shashank agarwal <sh...@gmail.com> wrote:
> 
> But this will be wrong in my case. So I have to wait for the results until I receive next event.
> 
> 
> 
> ‌
> 
> On Thu, Jan 4, 2018 at 3:53 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Think this is actually working as intended, from your earlier description of when results are produced: When you see Event 1.B, the watermark is not sufficiently advanced to trigger computation, only when you see Event 2.A does the watermark advance and you get a result. This is what I would expect to happen.
> 
> 
>> On 3. Jan 2018, at 19:46, shashank agarwal <shashank734@gmail.com <ma...@gmail.com>> wrote:
>> 
>> @Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the same issue.
>> 
>> 
>> @Aljoscha, I have to cover the case where B can come after A from Kafka. How I can achieve this as Event Time is not working. How should I implement this?
>> 
>>  A followedBy B.
>> 
>> As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 
>> 
>> I am trying to use Event time like this. Am I am doing anything wrong?
>> 
>> 
>>  kafkaSource.assignTimestampsAndWatermarks(
>>         new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
>>           override def extractTimestamp(event: Event): Long = {
>>             try {
>>               val originTime = event.origTimestamp.getOrElse("0").toLong
>>               if(originTime <= 0)
>>                 {
>>                   val serverTime = event.serverTimestamp.getOrElse("0").toLong
>>                   if(serverTime <= 0)
>>                     {
>>                       System.currentTimeMillis()
>>                     }
>>                   else
>>                     {
>>                       serverTime
>>                     }
>>                 }
>>               else {
>>                 originTime
>>               }
>>             }
>>             catch {
>>               case e: Exception => Log.error("OriginTimestamp Exception occured, "error", e.printStackTrace);
>>                 System.currentTimeMillis()
>>             }
>>           }
>>         }
>>       )
>> ‌
>> 
>> On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz <wysakowicz.dawid@gmail.com <ma...@gmail.com>> wrote:
>> Hi shashank,
>> 
>> What version of flink are you using? Is it possible that you are hitting this issue: https://issues.apache.org/jira/browse/FLINK-7563 <https://issues.apache.org/jira/browse/FLINK-7563> ?
>> 
>> Watermark semantics in CEP was buggy and events were processed only if its timestamp was lower than current watermark while it should be lower or equal.
>> 
>> Best
>> Dawid
>> 
>> > On 3 Jan 2018, at 17:05, shashank agarwal <shashank734@gmail.com <ma...@gmail.com>> wrote:
>> >
>> > ssed A with origTimestamp Y. (
>> 
>> 
>> 
>> 
>> -- 
>> Thanks Regards
>> 
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things....
>> 
> 
> 
> 
> 
> -- 
> Thanks Regards
> 
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....


Re: Flink CEP with event time

Posted by shashank agarwal <sh...@gmail.com>.
But this will be wrong in my case. So I have to wait for the results until
I receive next event.



‌

On Thu, Jan 4, 2018 at 3:53 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Think this is actually working as intended, from your earlier description
> of when results are produced: When you see Event 1.B, the watermark is not
> sufficiently advanced to trigger computation, only when you see Event 2.A
> does the watermark advance and you get a result. This is what I would
> expect to happen.
>
>
> On 3. Jan 2018, at 19:46, shashank agarwal <sh...@gmail.com> wrote:
>
> @Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the
> same issue.
>
>
> @Aljoscha, I have to cover the case where B can come after A from Kafka.
> How I can achieve this as Event Time is not working. How should I implement
> this?
>
>  A followedBy B.
>
> As I am using kafka source and my event API's using load balancers so
> sometimes B comes before A. So my CEP doesn't generate any result for those
> events.
>
> I am trying to use Event time like this. Am I am doing anything wrong?
>
>
>  kafkaSource.assignTimestampsAndWatermarks(
>         new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10))
> {
>           override def extractTimestamp(event: Event): Long = {
>             try {
>               val originTime = event.origTimestamp.getOrElse("0").toLong
>               if(originTime <= 0)
>                 {
>                   val serverTime = event.serverTimestamp.
> getOrElse("0").toLong
>                   if(serverTime <= 0)
>                     {
>                       System.currentTimeMillis()
>                     }
>                   else
>                     {
>                       serverTime
>                     }
>                 }
>               else {
>                 originTime
>               }
>             }
>             catch {
>               case e: Exception => Log.error("OriginTimestamp Exception
> occured, "error", e.printStackTrace);
>                 System.currentTimeMillis()
>             }
>           }
>         }
>       )
> ‌
>
> On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz <
> wysakowicz.dawid@gmail.com> wrote:
>
>> Hi shashank,
>>
>> What version of flink are you using? Is it possible that you are hitting
>> this issue: https://issues.apache.org/jira/browse/FLINK-7563 ?
>>
>> Watermark semantics in CEP was buggy and events were processed only if
>> its timestamp was lower than current watermark while it should be lower or
>> equal.
>>
>> Best
>> Dawid
>>
>> > On 3 Jan 2018, at 17:05, shashank agarwal <sh...@gmail.com>
>> wrote:
>> >
>> > ssed A with origTimestamp Y. (
>>
>>
>
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
>
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Re: Flink CEP with event time

Posted by Aljoscha Krettek <al...@apache.org>.
Think this is actually working as intended, from your earlier description of when results are produced: When you see Event 1.B, the watermark is not sufficiently advanced to trigger computation, only when you see Event 2.A does the watermark advance and you get a result. This is what I would expect to happen.


> On 3. Jan 2018, at 19:46, shashank agarwal <sh...@gmail.com> wrote:
> 
> @Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the same issue.
> 
> 
> @Aljoscha, I have to cover the case where B can come after A from Kafka. How I can achieve this as Event Time is not working. How should I implement this?
> 
>  A followedBy B.
> 
> As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 
> 
> I am trying to use Event time like this. Am I am doing anything wrong?
> 
> 
>  kafkaSource.assignTimestampsAndWatermarks(
>         new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
>           override def extractTimestamp(event: Event): Long = {
>             try {
>               val originTime = event.origTimestamp.getOrElse("0").toLong
>               if(originTime <= 0)
>                 {
>                   val serverTime = event.serverTimestamp.getOrElse("0").toLong
>                   if(serverTime <= 0)
>                     {
>                       System.currentTimeMillis()
>                     }
>                   else
>                     {
>                       serverTime
>                     }
>                 }
>               else {
>                 originTime
>               }
>             }
>             catch {
>               case e: Exception => Log.error("OriginTimestamp Exception occured, "error", e.printStackTrace);
>                 System.currentTimeMillis()
>             }
>           }
>         }
>       )
> ‌
> 
> On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz <wysakowicz.dawid@gmail.com <ma...@gmail.com>> wrote:
> Hi shashank,
> 
> What version of flink are you using? Is it possible that you are hitting this issue: https://issues.apache.org/jira/browse/FLINK-7563 <https://issues.apache.org/jira/browse/FLINK-7563> ?
> 
> Watermark semantics in CEP was buggy and events were processed only if its timestamp was lower than current watermark while it should be lower or equal.
> 
> Best
> Dawid
> 
> > On 3 Jan 2018, at 17:05, shashank agarwal <shashank734@gmail.com <ma...@gmail.com>> wrote:
> >
> > ssed A with origTimestamp Y. (
> 
> 
> 
> 
> -- 
> Thanks Regards
> 
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
> 


Re: Flink CEP with event time

Posted by shashank agarwal <sh...@gmail.com>.
@Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the
same issue.


@Aljoscha, I have to cover the case where B can come after A from Kafka.
How I can achieve this as Event Time is not working. How should I implement
this?

 A followedBy B.

As I am using kafka source and my event API's using load balancers so
sometimes B comes before A. So my CEP doesn't generate any result for those
events.

I am trying to use Event time like this. Am I am doing anything wrong?


 kafkaSource.assignTimestampsAndWatermarks(
        new
BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
          override def extractTimestamp(event: Event): Long = {
            try {
              val originTime = event.origTimestamp.getOrElse("0").toLong
              if(originTime <= 0)
                {
                  val serverTime =
event.serverTimestamp.getOrElse("0").toLong
                  if(serverTime <= 0)
                    {
                      System.currentTimeMillis()
                    }
                  else
                    {
                      serverTime
                    }
                }
              else {
                originTime
              }
            }
            catch {
              case e: Exception => Log.error("OriginTimestamp Exception
occured, "error", e.printStackTrace);
                System.currentTimeMillis()
            }
          }
        }
      )
‌

On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz <wysakowicz.dawid@gmail.com
> wrote:

> Hi shashank,
>
> What version of flink are you using? Is it possible that you are hitting
> this issue: https://issues.apache.org/jira/browse/FLINK-7563 ?
>
> Watermark semantics in CEP was buggy and events were processed only if its
> timestamp was lower than current watermark while it should be lower or
> equal.
>
> Best
> Dawid
>
> > On 3 Jan 2018, at 17:05, shashank agarwal <sh...@gmail.com> wrote:
> >
> > ssed A with origTimestamp Y. (
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Re: Flink CEP with event time

Posted by Dawid Wysakowicz <wy...@gmail.com>.
Hi shashank,

What version of flink are you using? Is it possible that you are hitting this issue: https://issues.apache.org/jira/browse/FLINK-7563 ?

Watermark semantics in CEP was buggy and events were processed only if its timestamp was lower than current watermark while it should be lower or equal.

Best
Dawid

> On 3 Jan 2018, at 17:05, shashank agarwal <sh...@gmail.com> wrote:
> 
> ssed A with origTimestamp Y. (


Re: Flink CEP with event time

Posted by shashank agarwal <sh...@gmail.com>.
Low Watermark is showing the same value which I am passing in event
"1514994744412" for all the tasks related to that stream, (No watermark) is
showing for Kafka source in UI.

So the pattern is following for CEP A followedBy B :

Event 1
- I passed A with origTimestamp X. (Low watermark updated to X)  : No
results  (this is right )
- I passed B with origTimestamp X1. (Low watermark updated to X1)  : No
results (results should be printed)

Event 2
- I passed A with origTimestamp Y. (Low watermark updated to Y)  : Results
of Event 1 printed  (this is wrong )
- I passed B with origTimestamp Y1. (Low watermark updated to Y1)  : No
results (results should be printed)

Event 3
- I passed A with origTimestamp Z. (Low watermark updated to Z)  : Results
of Event 2 printed  (this is wrong )
- I passed B with origTimestamp Z1. (Low watermark updated to Z1)  : No
results (results should be printed)



‌

On Wed, Jan 3, 2018 at 8:30 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Can you please check what the input watermark of your operations is? There
> is a metric called "currentLowWatermark" for this.
>
> Best,
> Aljoscha
>
> On 3. Jan 2018, at 15:54, shashank agarwal <sh...@gmail.com> wrote:
>
> Actually, In Kafka there are other topics also (around 5-6 topics) I am
> consuming particular topic 'x' which only contains events.  Other topics
> have different data.
>
> I am using two consumers in my program for 2 different topics. in first
> topic x i am extracting the timestamp from origintimestamp variable in
> other one i am using system current millis.
>
>
>
>
> ‌
>
> On Wed, Jan 3, 2018 at 8:06 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Ok, but will there be events in all Kafka partitions/topics?
>>
>>
>> On 3. Jan 2018, at 15:33, shashank agarwal <sh...@gmail.com> wrote:
>>
>> Hi,
>>
>> Yes, Events will always carry a variable OriginTimestamp which I am using
>> in the extractor. I have used fallback also in case of data missing will
>> put System current millis.
>>
>> Still, it's not printing results.
>>
>> Best,
>> Shashank
>>
>>
>>
>>
>>
>> ‌
>>
>> On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> Are all the partitions always carrying data that has advancing
>>> timestamps? When using Event-time the Kafka source (and Flink in general)
>>> needs to have steady progress in all partitions, otherwise the watermark
>>> does not advance, which in turn means that processing will be stalled
>>> downstream.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 3. Jan 2018, at 14:29, shashank agarwal <sh...@gmail.com>
>>> wrote:
>>>
>>> Hello,
>>>
>>> I have some patterns in my program. For an example,
>>>
>>>  A followedBy B.
>>>
>>> As I am using kafka source and my event API's using load balancers so
>>> sometimes B comes before A. So my CEP doesn't generate any result for those
>>> events.
>>>
>>> I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor"
>>> on kafkasource with extract time from an origin time variable which I have
>>> in the event. I am using watermark lateness of 10 seconds in that.
>>>
>>> Now CEP stopped generating results. It's not even generating results
>>> where Event B comes after A. I have tried within (10 seconds) in CEP also
>>> still not generating results.
>>>
>>> Am I doing anything wrong?
>>>
>>> I have to cover the case where B can come after A from Kafka.
>>>
>>> --
>>> Thanks Regards
>>>
>>> SHASHANK AGARWAL
>>>  ---  Trying to mobilize the things....
>>>
>>>
>>>
>>>
>>> ‌
>>>
>>>
>>>
>>
>>
>> --
>> Thanks Regards
>>
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things....
>>
>>
>>
>
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
>
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Re: Flink CEP with event time

Posted by Aljoscha Krettek <al...@apache.org>.
Can you please check what the input watermark of your operations is? There is a metric called "currentLowWatermark" for this.

Best,
Aljoscha

> On 3. Jan 2018, at 15:54, shashank agarwal <sh...@gmail.com> wrote:
> 
> Actually, In Kafka there are other topics also (around 5-6 topics) I am consuming particular topic 'x' which only contains events.  Other topics have different data. 
> 
> I am using two consumers in my program for 2 different topics. in first topic x i am extracting the timestamp from origintimestamp variable in other one i am using system current millis.
> 
> 
> 
> 
> ‌
> 
> On Wed, Jan 3, 2018 at 8:06 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Ok, but will there be events in all Kafka partitions/topics?
> 
> 
>> On 3. Jan 2018, at 15:33, shashank agarwal <shashank734@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> Yes, Events will always carry a variable OriginTimestamp which I am using in the extractor. I have used fallback also in case of data missing will put System current millis. 
>> 
>> Still, it's not printing results.
>> 
>> Best,
>> Shashank
>> 
>> 
>> 
>> 
>> 
>> ‌
>> 
>> On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>> Hi,
>> 
>> Are all the partitions always carrying data that has advancing timestamps? When using Event-time the Kafka source (and Flink in general) needs to have steady progress in all partitions, otherwise the watermark does not advance, which in turn means that processing will be stalled downstream.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 3. Jan 2018, at 14:29, shashank agarwal <shashank734@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hello,
>>> 
>>> I have some patterns in my program. For an example,
>>> 
>>>  A followedBy B.
>>> 
>>> As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 
>>> 
>>> I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time from an origin time variable which I have in the event. I am using watermark lateness of 10 seconds in that.
>>> 
>>> Now CEP stopped generating results. It's not even generating results where Event B comes after A. I have tried within (10 seconds) in CEP also still not generating results. 
>>> 
>>> Am I doing anything wrong?
>>> 
>>> I have to cover the case where B can come after A from Kafka.
>>> 
>>> -- 
>>> Thanks Regards
>>> 
>>> SHASHANK AGARWAL
>>>  ---  Trying to mobilize the things....
>>> 
>>> 
>>> 
>>> 
>>> ‌
>> 
>> 
>> 
>> 
>> -- 
>> Thanks Regards
>> 
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things....
> 
> 
> 
> 
> -- 
> Thanks Regards
> 
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
> 


Re: Flink CEP with event time

Posted by shashank agarwal <sh...@gmail.com>.
Actually, In Kafka there are other topics also (around 5-6 topics) I am
consuming particular topic 'x' which only contains events.  Other topics
have different data.

I am using two consumers in my program for 2 different topics. in first
topic x i am extracting the timestamp from origintimestamp variable in
other one i am using system current millis.




‌

On Wed, Jan 3, 2018 at 8:06 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Ok, but will there be events in all Kafka partitions/topics?
>
>
> On 3. Jan 2018, at 15:33, shashank agarwal <sh...@gmail.com> wrote:
>
> Hi,
>
> Yes, Events will always carry a variable OriginTimestamp which I am using
> in the extractor. I have used fallback also in case of data missing will
> put System current millis.
>
> Still, it's not printing results.
>
> Best,
> Shashank
>
>
>
>
>
> ‌
>
> On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>>
>> Are all the partitions always carrying data that has advancing
>> timestamps? When using Event-time the Kafka source (and Flink in general)
>> needs to have steady progress in all partitions, otherwise the watermark
>> does not advance, which in turn means that processing will be stalled
>> downstream.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 3. Jan 2018, at 14:29, shashank agarwal <sh...@gmail.com> wrote:
>>
>> Hello,
>>
>> I have some patterns in my program. For an example,
>>
>>  A followedBy B.
>>
>> As I am using kafka source and my event API's using load balancers so
>> sometimes B comes before A. So my CEP doesn't generate any result for those
>> events.
>>
>> I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor"
>> on kafkasource with extract time from an origin time variable which I have
>> in the event. I am using watermark lateness of 10 seconds in that.
>>
>> Now CEP stopped generating results. It's not even generating results
>> where Event B comes after A. I have tried within (10 seconds) in CEP also
>> still not generating results.
>>
>> Am I doing anything wrong?
>>
>> I have to cover the case where B can come after A from Kafka.
>>
>> --
>> Thanks Regards
>>
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things....
>>
>>
>>
>>
>> ‌
>>
>>
>>
>
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
>
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Re: Flink CEP with event time

Posted by Aljoscha Krettek <al...@apache.org>.
Ok, but will there be events in all Kafka partitions/topics?

> On 3. Jan 2018, at 15:33, shashank agarwal <sh...@gmail.com> wrote:
> 
> Hi,
> 
> Yes, Events will always carry a variable OriginTimestamp which I am using in the extractor. I have used fallback also in case of data missing will put System current millis. 
> 
> Still, it's not printing results.
> 
> Best,
> Shashank
> 
> 
> 
> 
> 
> ‌
> 
> On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi,
> 
> Are all the partitions always carrying data that has advancing timestamps? When using Event-time the Kafka source (and Flink in general) needs to have steady progress in all partitions, otherwise the watermark does not advance, which in turn means that processing will be stalled downstream.
> 
> Best,
> Aljoscha
> 
> 
>> On 3. Jan 2018, at 14:29, shashank agarwal <shashank734@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hello,
>> 
>> I have some patterns in my program. For an example,
>> 
>>  A followedBy B.
>> 
>> As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 
>> 
>> I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time from an origin time variable which I have in the event. I am using watermark lateness of 10 seconds in that.
>> 
>> Now CEP stopped generating results. It's not even generating results where Event B comes after A. I have tried within (10 seconds) in CEP also still not generating results. 
>> 
>> Am I doing anything wrong?
>> 
>> I have to cover the case where B can come after A from Kafka.
>> 
>> -- 
>> Thanks Regards
>> 
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things....
>> 
>> 
>> 
>> 
>> ‌
> 
> 
> 
> 
> -- 
> Thanks Regards
> 
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....


Re: Flink CEP with event time

Posted by shashank agarwal <sh...@gmail.com>.
Hi,

Yes, Events will always carry a variable OriginTimestamp which I am using
in the extractor. I have used fallback also in case of data missing will
put System current millis.

Still, it's not printing results.

Best,
Shashank





‌

On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> Are all the partitions always carrying data that has advancing timestamps?
> When using Event-time the Kafka source (and Flink in general) needs to have
> steady progress in all partitions, otherwise the watermark does not
> advance, which in turn means that processing will be stalled downstream.
>
> Best,
> Aljoscha
>
>
> On 3. Jan 2018, at 14:29, shashank agarwal <sh...@gmail.com> wrote:
>
> Hello,
>
> I have some patterns in my program. For an example,
>
>  A followedBy B.
>
> As I am using kafka source and my event API's using load balancers so
> sometimes B comes before A. So my CEP doesn't generate any result for those
> events.
>
> I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor"
> on kafkasource with extract time from an origin time variable which I have
> in the event. I am using watermark lateness of 10 seconds in that.
>
> Now CEP stopped generating results. It's not even generating results where
> Event B comes after A. I have tried within (10 seconds) in CEP also still
> not generating results.
>
> Am I doing anything wrong?
>
> I have to cover the case where B can come after A from Kafka.
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
>
>
>
>
> ‌
>
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Re: Flink CEP with event time

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Are all the partitions always carrying data that has advancing timestamps? When using Event-time the Kafka source (and Flink in general) needs to have steady progress in all partitions, otherwise the watermark does not advance, which in turn means that processing will be stalled downstream.

Best,
Aljoscha

> On 3. Jan 2018, at 14:29, shashank agarwal <sh...@gmail.com> wrote:
> 
> Hello,
> 
> I have some patterns in my program. For an example,
> 
>  A followedBy B.
> 
> As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. 
> 
> I have then tried event time and applied "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time from an origin time variable which I have in the event. I am using watermark lateness of 10 seconds in that.
> 
> Now CEP stopped generating results. It's not even generating results where Event B comes after A. I have tried within (10 seconds) in CEP also still not generating results. 
> 
> Am I doing anything wrong?
> 
> I have to cover the case where B can come after A from Kafka.
> 
> -- 
> Thanks Regards
> 
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
> 
> 
> 
> 
> ‌