You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by RayL <ra...@cisco.com> on 2019/03/20 09:35:29 UTC

Flink CEP pattern design question

Currently I'm designing a CEP pattern to satisfy our business needs.
Basically, there's two events let's call it a and b.
Both a and b can have zero or multiple entries in the log.
For input {a,b1,b2}, I want to get the output of {a,b1,b2}
For input {b1,b2}, I want to get the output of {b1,b2} after a timeout
period(10 seconds)
For input {a1,a2}, I want to get the output of {a1,a2} after a timeout
period(10 seconds)

Currently, my code looks like this.
val pattern = Pattern
			.begin[Event]("start")
			.where(_._.getName == "a")
    		        .oneOrMore.optional
			.followedBy("end")
			.where(_._.getName == "b")
			.oneOrMore.optional
    		        .within(Time.seconds(10))
For input {a,b1,b2}, I got the output of {a1} {a1,b1} {a,b1,b2} {b1,b2} {b2}
For input {b1,b2}, I got the output of {b1,b2} {b1} {b2}
For input {a1,a2}, I got the output of {a1,a2} {a1} {a2}

Also tried SKIP_PAST_LAST_EVENT policy, and didn't work.

Please advice me how to design this pattern to match my needs.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink CEP pattern design question

Posted by Dawid Wysakowicz <dw...@apache.org>.
I think what you ask for is something like timing out greedy[1]
quantifier, which is not supported.

As a rather dirty workaround you could try sth like:

Pattern
			.begin[Event]("start")
			.where(_._.getName == "a")
    		        .oneOrMore.optional.greedy
			.followedBy("end")
			.where(_._.getName == "b")
			.oneOrMore.optional.greedy
			.followedBy("dummy")
			.where(_ => false)
 			.within(Time.seconds(10))

and work only with the timed out matches. Another option is you can try
implementing that logic with ProcessFunction[2]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/libs/cep.html#quantifiers

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/process_function.html#process-function-low-level-operations

On 20/03/2019 10:35, RayL wrote:
> Currently I'm designing a CEP pattern to satisfy our business needs.
> Basically, there's two events let's call it a and b.
> Both a and b can have zero or multiple entries in the log.
> For input {a,b1,b2}, I want to get the output of {a,b1,b2}
> For input {b1,b2}, I want to get the output of {b1,b2} after a timeout
> period(10 seconds)
> For input {a1,a2}, I want to get the output of {a1,a2} after a timeout
> period(10 seconds)
>
> Currently, my code looks like this.
> val pattern = Pattern
> 			.begin[Event]("start")
> 			.where(_._.getName == "a")
>     		        .oneOrMore.optional
> 			.followedBy("end")
> 			.where(_._.getName == "b")
> 			.oneOrMore.optional
>     		        .within(Time.seconds(10))
> For input {a,b1,b2}, I got the output of {a1} {a1,b1} {a,b1,b2} {b1,b2} {b2}
> For input {b1,b2}, I got the output of {b1,b2} {b1} {b2}
> For input {a1,a2}, I got the output of {a1,a2} {a1} {a2}
>
> Also tried SKIP_PAST_LAST_EVENT policy, and didn't work.
>
> Please advice me how to design this pattern to match my needs.
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/