You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dawid Wysakowicz <dw...@apache.org> on 2020/08/11 08:01:06 UTC

Re: Matching largest event pattern without duplicates

Hi James,

I think it is not easy to achieve with the CEP library. Adding the
consecutive quantifier to the oneOrMore strategy should eliminate a few
of the unwanted cases from your example (`b:c`, `b`, `a`, `c`), but it
would not eliminate the `c:a`. The problem is you need to skip to the
first duplicate in the chain. There is no method that would let you do a
"conditional jump".

I'd recommend implementing the logic with e.g. a custom FlatMap function
and a ListState[1], where you could keep the sequence in the state and
prune the leading elements up until the duplicate.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#using-keyed-state

On 29/07/2020 19:03, James Buchan wrote:
> Hey all,
>
> I'm trying to complete a small POC to see if Flink is suitable for our
> needs and the first step is to evaluate a stream of events and
> continually output the largest active group that does not contain
> duplicates.  I'm attempting to do this with the CEP pattern matching.
>
> For example, for the following input:
>
> >a
> >a
> >b
> >c
> >a
> >c
>
> I would expect an output of:
>
> a
> a
> a:b
> a:b:c
> b:c:a
> a:c
>
> The closest I've been able to get is which returns:
>
> a
> a
> a:b
> a:b:c
> b:c:a
> b:c
> b
> c:a
> a:c
> a
> c
>
> When the initial pattern continues to grow it looks good, but as soon
> as duplicate is seen I receive more results than I would like.  This
> example uses the skipToFirst strategy; I thought others would be more
> helpful but ended up with less desirable results.
>
> This feels like it should be easily solvable but I've not been able to
> find the right combination of options to get it working.  Any
> assistance would be appreciated.
>
> Here's the details of my latest method:
>
> public static void cep() throws Exception {
>   log.info("Initializing cep processor"); String inputTopic = "inputTopic"; String outputTopic = "outputTopic"; String consumerGroup = "testGroup"; String address = "localhost:9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); log.info("Creating consumer"); FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
>       inputTopic, address, consumerGroup); flinkKafkaConsumer.setStartFromLatest(); log.info("Creating producer"); FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address); log.info("Configuring sources"); DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer); log.info("Processing kafka messages"); AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("start"); Pattern<String, ?> pattern = Pattern.<String>begin("start", skipStrategy)
>       .oneOrMore()
>       .until(new IterativeCondition<>() {
>         @Override public boolean filter(String s, Context<String> context) throws Exception {
>           return StreamSupport.stream(context.getEventsForPattern("start").spliterator(), false)
>               .anyMatch(state -> state.equals(s)); }
>       }); PatternStream<String> patternStream = CEP.pattern(stringInputStream, pattern); DataStream<String> result = patternStream.select(
>       (PatternSelectFunction<String, String>) map ->
>           String.format("Evaluated these states %s", String.join(":", map.get("start")))
>   ); result.addSink(flinkKafkaProducer); environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); environment.execute("Flink cep Example"); }
>
>
> Thanks!
>
> -James