You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Moiz Motorwala <mo...@gmail.com> on 2018/11/22 06:26:05 UTC

Exit pattern matching

Hi Flink Team,

First, I would like to thank for such a wonderful tool and appreciate your
efforts.
I am developing an application where I am processing stream of geolocations
and detecting geofence if exactly 3 events are inside geofence. The code is
something like this:-

Pattern<KafkaLocationEvent, ?> geofencePattern =
Pattern.<KafkaLocationEvent>begin("matched-events")
                .where(new IterativeCondition<KafkaLocationEvent>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public boolean filter(KafkaLocationEvent value,
Context<KafkaLocationEvent> ctx) throws Exception {
                        return distanceInMts < geofenceRadiusInMts;
                    }
                }).times(3).consecutive();

                 PatternStream<KafkaTaskEvent> tempPatternStream =
CEP.pattern(messageStream, geofencePattern);

                 // select matching patterns and generate alerts
                    DataStream<String> alerts =
tempPatternStream.select(new GeofenceAlertPatternSelectFunction());

The issue is that alerts are generated even if I get 4th event inside
geofence and so on. I just want to generate alert once at the first pattern
match and exit the pattern after that.

As per docs, I am not able to find a relevant function I can use to do
this. Please help.
Thanks in Advance.

-- 
*Regards,*

*Moiz Motorwala*

Re: Exit pattern matching

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Moiz,

What do you mean by exit pattern? Do you mean that you want an event to
belong to a single match? If so I think you can achieve that with
AfterMatchSkip.SKIP_PAST_LAST strategy[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/libs/cep.html#after-match-skip-strategy

On 22/11/2018 07:26, Moiz Motorwala wrote:
> Hi Flink Team,
>
> First, I would like to thank for such a wonderful tool and appreciate your
> efforts.
> I am developing an application where I am processing stream of geolocations
> and detecting geofence if exactly 3 events are inside geofence. The code is
> something like this:-
>
> Pattern<KafkaLocationEvent, ?> geofencePattern =
> Pattern.<KafkaLocationEvent>begin("matched-events")
>                 .where(new IterativeCondition<KafkaLocationEvent>() {
>
>                     private static final long serialVersionUID = 1L;
>
>                     @Override
>                     public boolean filter(KafkaLocationEvent value,
> Context<KafkaLocationEvent> ctx) throws Exception {
>                         return distanceInMts < geofenceRadiusInMts;
>                     }
>                 }).times(3).consecutive();
>
>                  PatternStream<KafkaTaskEvent> tempPatternStream =
> CEP.pattern(messageStream, geofencePattern);
>
>                  // select matching patterns and generate alerts
>                     DataStream<String> alerts =
> tempPatternStream.select(new GeofenceAlertPatternSelectFunction());
>
> The issue is that alerts are generated even if I get 4th event inside
> geofence and so on. I just want to generate alert once at the first pattern
> match and exit the pattern after that.
>
> As per docs, I am not able to find a relevant function I can use to do
> this. Please help.
> Thanks in Advance.
>