You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by James Buchan <ja...@googlemail.com> on 2020/07/29 17:03:03 UTC

Matching largest event pattern without duplicates

Hey all,

I'm trying to complete a small POC to see if Flink is suitable for our
needs and the first step is to evaluate a stream of events and continually
output the largest active group that does not contain duplicates.  I'm
attempting to do this with the CEP pattern matching.

For example, for the following input:

>a
>a
>b
>c
>a
>c

I would expect an output of:

a
a
a:b
a:b:c
b:c:a
a:c

The closest I've been able to get is which returns:

a
a
a:b
a:b:c
b:c:a
b:c
b
c:a
a:c
a
c

When the initial pattern continues to grow it looks good, but as soon as
duplicate is seen I receive more results than I would like.  This example
uses the skipToFirst strategy; I thought others would be more helpful but
ended up with less desirable results.

This feels like it should be easily solvable but I've not been able to find
the right combination of options to get it working.  Any assistance would
be appreciated.

Here's the details of my latest method:

public static void cep() throws Exception {
  log.info("Initializing cep processor");

  String inputTopic = "inputTopic";
  String outputTopic = "outputTopic";
  String consumerGroup = "testGroup";
  String address = "localhost:9092";

  StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();

  log.info("Creating consumer");
  FlinkKafkaConsumer011<String> flinkKafkaConsumer =
createStringConsumerForTopic(
      inputTopic, address, consumerGroup);
  flinkKafkaConsumer.setStartFromLatest();

  log.info("Creating producer");
  FlinkKafkaProducer011<String> flinkKafkaProducer =
createStringProducer(outputTopic, address);

  log.info("Configuring sources");
  DataStream<String> stringInputStream =
environment.addSource(flinkKafkaConsumer);

  log.info("Processing kafka messages");
  AfterMatchSkipStrategy skipStrategy =
AfterMatchSkipStrategy.skipToFirst("start");
  Pattern<String, ?> pattern = Pattern.<String>begin("start", skipStrategy)
      .oneOrMore()
      .until(new IterativeCondition<>() {
        @Override
        public boolean filter(String s, Context<String> context)
throws Exception {
          return
StreamSupport.stream(context.getEventsForPattern("start").spliterator(),
false)
              .anyMatch(state -> state.equals(s));
        }
      });

  PatternStream<String> patternStream = CEP.pattern(stringInputStream, pattern);
  DataStream<String> result = patternStream.select(
      (PatternSelectFunction<String, String>) map ->
          String.format("Evaluated these states %s", String.join(":",
map.get("start")))
  );
  result.addSink(flinkKafkaProducer);

  environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  environment.execute("Flink cep Example");
}



Thanks!

-James

Re: Between Flink 1.9 and 1.11 - any behavior change for web.upload.dir

Posted by Avijit Saha <av...@gmail.com>.
Hello,

Has there been any change in behavior related to the "web.upload.dir"
behavior between Flink 1.9 and 1.11?

I have a failure case where when build an image using
"flink:1.11.0-scala_2.12" in Dockerfile, the job manager job submissions
fail with the following Exception but the same flow works fine (for the
same underlying Code image) when using
"flink:1.9.1-scala_2.12"..............

This is the Exception stack trace for 1.11 and not seen using 1.9:
------------------------------------------------------------------------------------------
Caused by: java.nio.file.FileAlreadyExistsException:
/opt/flink/flink-web-upload
        at
sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
~[?:1.8.0_262]
        at
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
~[?:1.8.0_262]
        at
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
~[?:1.8.0_262]
        at
sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
~[?:1.8.0_262]
        at java.nio.file.Files.createDirectory(Files.java:674)
~[?:1.8.0_262]
        at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
~[?:1.8.0_262]
        at java.nio.file.Files.createDirectories(Files.java:727)
~[?:1.8.0_262]
        at
org.apache.flink.runtime.rest.RestServerEndpoint.checkAndCreateUploadDir(RestServerEndpoint.java:478)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rest.RestServerEndpoint.createUploadDir(RestServerEndpoint.java:462)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rest.RestServerEndpoint.<init>(RestServerEndpoint.java:114)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.<init>(WebMonitorEndpoint.java:200)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.<init>(DispatcherRestEndpoint.java:68)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rest.SessionRestEndpointFactory.createRestEndpoint(SessionRestEndpointFactory.java:63)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:152)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        ... 2 more

>

Re: Matching largest event pattern without duplicates

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi James,

I think it is not easy to achieve with the CEP library. Adding the
consecutive quantifier to the oneOrMore strategy should eliminate a few
of the unwanted cases from your example (`b:c`, `b`, `a`, `c`), but it
would not eliminate the `c:a`. The problem is you need to skip to the
first duplicate in the chain. There is no method that would let you do a
"conditional jump".

I'd recommend implementing the logic with e.g. a custom FlatMap function
and a ListState[1], where you could keep the sequence in the state and
prune the leading elements up until the duplicate.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#using-keyed-state

On 29/07/2020 19:03, James Buchan wrote:
> Hey all,
>
> I'm trying to complete a small POC to see if Flink is suitable for our
> needs and the first step is to evaluate a stream of events and
> continually output the largest active group that does not contain
> duplicates.  I'm attempting to do this with the CEP pattern matching.
>
> For example, for the following input:
>
> >a
> >a
> >b
> >c
> >a
> >c
>
> I would expect an output of:
>
> a
> a
> a:b
> a:b:c
> b:c:a
> a:c
>
> The closest I've been able to get is which returns:
>
> a
> a
> a:b
> a:b:c
> b:c:a
> b:c
> b
> c:a
> a:c
> a
> c
>
> When the initial pattern continues to grow it looks good, but as soon
> as duplicate is seen I receive more results than I would like.  This
> example uses the skipToFirst strategy; I thought others would be more
> helpful but ended up with less desirable results.
>
> This feels like it should be easily solvable but I've not been able to
> find the right combination of options to get it working.  Any
> assistance would be appreciated.
>
> Here's the details of my latest method:
>
> public static void cep() throws Exception {
>   log.info("Initializing cep processor"); String inputTopic = "inputTopic"; String outputTopic = "outputTopic"; String consumerGroup = "testGroup"; String address = "localhost:9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); log.info("Creating consumer"); FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
>       inputTopic, address, consumerGroup); flinkKafkaConsumer.setStartFromLatest(); log.info("Creating producer"); FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address); log.info("Configuring sources"); DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer); log.info("Processing kafka messages"); AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("start"); Pattern<String, ?> pattern = Pattern.<String>begin("start", skipStrategy)
>       .oneOrMore()
>       .until(new IterativeCondition<>() {
>         @Override public boolean filter(String s, Context<String> context) throws Exception {
>           return StreamSupport.stream(context.getEventsForPattern("start").spliterator(), false)
>               .anyMatch(state -> state.equals(s)); }
>       }); PatternStream<String> patternStream = CEP.pattern(stringInputStream, pattern); DataStream<String> result = patternStream.select(
>       (PatternSelectFunction<String, String>) map ->
>           String.format("Evaluated these states %s", String.join(":", map.get("start")))
>   ); result.addSink(flinkKafkaProducer); environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); environment.execute("Flink cep Example"); }
>
>
> Thanks!
>
> -James