You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Biplob Biswas <re...@gmail.com> on 2017/05/26 12:25:55 UTC

No Alerts with FinkCEP

Hi,

I just started exploring Flink CEP a day back and I thought I can use it to
make a simple event processor. For that I looked into the CEP examples by
Till and some other articles. 

Now I have 2 questions which i would like to ask:

*Part 1:*

I came up with the following piece of code, but this is not working as
expected.

///**************** Main ******************///


FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
            "testTopic",
            new SimpleStringSchema(),
            props);

    DataStream<String> input = env.addSource(consumer);
    LOG.info("About to process events");
    DataStream<ReadEventType> events =
            input
                    //.map(s -> s.f1)
                    .map(new MapStringToRRE())
                    .filter(Objects::nonNull);

    //events.print();

    DataStream<ReadEventType> partitionedInput = events
            .keyBy((KeySelector<ReadEventType, String>) value ->
value.getRawTransactionItem().getChargedAccount());

    Pattern<ReadEventType, ?> pattern =
Pattern.<ReadEventType>begin("first")
            .where(event -> event.getFormat() == FormatType.FILE)
            .followedBy("second")
            .where(event -> event.getFormat() == FormatType.FILE)
            .within(Time.seconds(1));

    PatternStream<ReadEventType> patternStream =
CEP.pattern(partitionedInput, pattern);

    DataStream<String> alerts =
patternStream.select((PatternSelectFunction<ReadEventType, String>)
CEPForBAMRRE::createAlert);

    alerts.print();

    env.execute("CEP monitoring job");
  }


///*********** Alert Function returning just concat of txn id
***************///

  private static String createAlert(Map<String, ReadEventType> pattern) {
    return pattern.get("first").getTransactionItem().getUid() + " " +
            pattern.get("second").getTransactionItem().getUid();
  }

///******************* properties for kafka **************///

  private static Properties getDefaultProperties(Properties prop){
    prop.put("group.id", "FlinkCEP");
    prop.put("bootstrap.servers", BOOTSTRAP_SERVERS);
    prop.put("zookeeper.connect", ZKEEPER);
    prop.put("auto.offset.reset", "earliest");
    return prop;
  }


As my kafka topic only sends me events with formattype = FILE, I was
expecting to see multiple alerts being raised. But thats not the case, i am
not getting any alert at the moment.

Can anyone point out what am I doing wrong? 

PART 2: 

Also, my main aim for using CEP is to read from different topics and raise
alert if a second event is *not* followed by a first event within a given
time interval. How can I achieve it with FlinkCEP? for now I can only see
that if 2 events follow within a time interval an alert should be raised. 


Thanks & Regards,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: No Alerts with FinkCEP

Posted by Biplob Biswas <re...@gmail.com>.
Hi, 

I am sorry it worked with the BoundedOutOfOrdernessTimestampExtractor,
somehow I replayed my events from kafka and the older events were also on
the bus and it didnt correlate with my new events. 

Now i cleaned up my code and restarted it from the begninning and it works. 

Thanks a lot for the help.

Regards,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13410.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: No Alerts with FinkCEP

Posted by Biplob Biswas <re...@gmail.com>.
Hi Kostas,

I am okay with processing time at the moment but as my events already have a
creation timestamp added to them and also to explore further the event time
aspect with FlinkCEP, I proceeded further with evaluating with event time.

For this I tried both 
1. AscendingTimestampExtractor: using this gives me warning with Timestamp
monotony violated: 1478048406982 < 1478051502295 and i get no alerts
generated.

This was for me still an expected behaviour as I am reading from more than 2
topics and i suspect that some events in the second topic(which has only one
parition) are assigned timestamp at creation but they are not pushed on the
kafka topic immediately and there is some delay, so in a partition the
events are not all in ascending order. 

2. BoundedOutOfOrdernessTimestampExtractor: As i realized that there is some
delay in the events within a partition, I used this timestamp extractor with
maxOutOfOrderness of 60 seconds.

This is not giving me any warning but I am again not getting any alerts, i
checked my partionedinputs and I have events there. 

Following is an excerpt of the code I am using:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(1000);

    // configure Kafka consumer
    Properties props = new Properties();
    props = getDefaultProperties(props);

    FlinkKafkaConsumer010<BAMEvent> kafkaSource = new
FlinkKafkaConsumer010<>(
            Arrays.asList("getm", "msgm", "tte"),
            new StringSerializerToEvent(),
            props);

/*    kafkaSource.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time.seconds(60)) {

      private static final long serialVersionUID = -7228487240278428374L;

      @Override
      public long extractTimestamp(BAMEvent event) {
        return event.getTimestamp();
      }
    });*/

    kafkaSource.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<BAMEvent>() {
      private static final long serialVersionUID = -4358312835839141890L;

      @Override
      public long extractAscendingTimestamp(BAMEvent event) {
        return event.getTimestamp();
      }
    });

    DataStream<BAMEvent> events = env.addSource(kafkaSource);



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13409.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: No Alerts with FinkCEP

Posted by Kostas Kloudas <k....@data-artisans.com>.
You could also remove the autoWatermarkInterval, if you are satisfied with ProcessingTime.

Although keep in mind that processingTime assigns timestamps to elements based on the order
that they arrive to the operator. This means that replaying the same stream can give different 
results.

If you care about time handling and reproducibility of your results you could use event or 
ingestion time (the latter means that each element will have its timestamp based on 
when it was first ingested by you Flink job).

For more information on “time” you can look here 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html>

Cheers,
Kostas

> On May 31, 2017, at 2:29 PM, Biplob Biswas <re...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> My application didn't have any timestamp extractor nor my events had any
> timestamp. Still I was using event time for processing it, probably that's
> why it was blocked.
> 
> Now I removed the part where I mention timechracteristics as Event time and
> it works now.
> 
> For example:
> 
> Previously:
> 
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.getConfig().setAutoWatermarkInterval(1000);
> 
> And MyEvent was (No field for timestamp):
> 
> public class BAMEvent {
>  private String id;
>  private String eventName;
>  private String eventId;
>  private List<String> correlationID;
> 
> //Getters
> 
> //Setters
> }
> 
> 
> Now:
> 
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> /* env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); */
> env.getConfig().setAutoWatermarkInterval(1000);
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13405.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: No Alerts with FinkCEP

Posted by Biplob Biswas <re...@gmail.com>.
Hi Kostas,

My application didn't have any timestamp extractor nor my events had any
timestamp. Still I was using event time for processing it, probably that's
why it was blocked.

Now I removed the part where I mention timechracteristics as Event time and
it works now.

For example:

Previously:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);

And MyEvent was (No field for timestamp):

public class BAMEvent {
  private String id;
  private String eventName;
  private String eventId;
  private List<String> correlationID;

//Getters

//Setters
}


Now:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
/* env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); */
env.getConfig().setAutoWatermarkInterval(1000);



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13405.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: No Alerts with FinkCEP

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Biplob,

Great to hear that everything worked out and that you are not blocked!

For the timestamp assigning issue, you mean that you specified no timestamp
extractor in your job and all your elements had Long.MIN_VALUE timestamp right?

Kostas

> On May 31, 2017, at 1:28 PM, Biplob Biswas <re...@gmail.com> wrote:
> 
> Hi Dawid,
> 
> Thanks for the response. Timeout patterns work like a charm, I saw it
> previously but didn't understood what it does, thanks for explaining that.
> 
> Also, my problem with no alerts is solved now.
> 
> The problem was that I was using "Event Time" for processing whereas my
> events didn't have any timestamp in itself.  So I removed that part (I am
> assuming its working on the processing time now), and now Flink CEP works
> like a charm. 
> 
> Maybe if in the future there's some log stating the same, it would be easier
> and faster to debug the issue. Right now because it printed nothing I had to
> trial and error.
> 
> Anyway, FlinkCEP just saved me days of work (here we were thinking of using
> Spark structured streaming with windowing for the corresponding CEP)
> 
> Thanks a lot for all the Help, I have never been disappointed with the Flink
> user Group. :) 
> 
> Regards,
> Biplob
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13401.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: No Alerts with FinkCEP

Posted by Biplob Biswas <re...@gmail.com>.
Hi Dawid,

Thanks for the response. Timeout patterns work like a charm, I saw it
previously but didn't understood what it does, thanks for explaining that.

Also, my problem with no alerts is solved now.

The problem was that I was using "Event Time" for processing whereas my
events didn't have any timestamp in itself.  So I removed that part (I am
assuming its working on the processing time now), and now Flink CEP works
like a charm. 

Maybe if in the future there's some log stating the same, it would be easier
and faster to debug the issue. Right now because it printed nothing I had to
trial and error.

Anyway, FlinkCEP just saved me days of work (here we were thinking of using
Spark structured streaming with windowing for the corresponding CEP)

Thanks a lot for all the Help, I have never been disappointed with the Flink
user Group. :) 

Regards,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13401.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: No Alerts with FinkCEP

Posted by Dawid Wysakowicz <wy...@gmail.com>.
Hi Biplob,

The message you mention should not be a problem here. It just says you
can't use your events as POJOs (e.g. you can't use keyBy("chargedAccount")
).
Your code seems fine and without some example data I think it will be hard
to help you.

As for the PART 2 of your first email.
In 1.3 we introduced NOT pattern but right now it does not support time
ranges in which a pattern should not occur. The thing you can do though is
to specify a positive Pattern like: ("a" -> "b" within 1s) and select the
timeouted patterns, which in fact are the ones that you want to trigger
alerts for.

Re: No Alerts with FinkCEP

Posted by Biplob Biswas <re...@gmail.com>.
Hello Kostas,

I made the necessary changes and adapted the code to reflect the changes
with 1.4-Snapshot. I still have similar behaviour, I can see that the data
is there after partitionedinut stream but no alerts are being raised.

I see some info log on my console as follows:

INFO  o.a.f.a.java.typeutils.TypeExtractor - class
com.project.generated.xjc.ReadEventType is not a valid POJO type because not
all fields are valid POJO fields.

Now one thing to explicitly mention is that, I am receiving string over
Kafka topics and then I am deserializing the string to the POJO which is
defined using the XSD of the defined event. but i still am not sure how it
changes the way alert is being generated.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13362.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: No Alerts with FinkCEP

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Biplob,

For the 1.4 version, the input of the select function has changed to expect a list of 
matching events (Map<String, List<IN>> map instead of Map<String, IN> map), as 
we have added quantifiers. 

Also the FIlterFunction has changed to SimpleCondition. 

The documentation is lagging a bit behind, but it is coming soon.

Now for the code I will have to dig into it a bit more.

Kostas

> On May 26, 2017, at 4:07 PM, Biplob Biswas <re...@gmail.com> wrote:
> 
> Hello Kostas,
> 
> Thanks for the suggestions.
> 
> I checked and I am getting my events in the partitionedInput stream when i
> am printing it but still nothing on the alert side. I checked flink UI for
> backpressure and all seems to be normal (I am having at max 1000 events per
> second on the kafka topic so  don't think backpressure could be a problem,
> atleast I expect so)
> 
> Also, I haven't run my test with my test data as a collection but I tried
> with this following example and I did get alerts as a result: 
> 
> 
> // CEPTest using collection
> 
> List<MyEvent> inputElements = new ArrayList<>();
>    inputElements.add(new MyEvent(1, 'a', 1, 1));
>    inputElements.add(new MyEvent(1, 'b', 1, 2));
>    inputElements.add(new MyEvent(1, 'b', 2, 2));
>    inputElements.add(new MyEvent(1, 'b', 3, 5));
> 
>    Pattern<MyEvent, ?> pattern = Pattern.<MyEvent>begin("a").where(new
> FilterFunction<MyEvent>() {
>      private static final long serialVersionUID = 7219646616484327688L;
> 
>      @Override
>      public boolean filter(MyEvent myEvent) throws Exception {
>        return myEvent.getPayload() == 'a';
>      }
>    }).followedBy("b").where(new FilterFunction<MyEvent>() {
>      private static final long serialVersionUID = 7219646616484327688L;
> 
>      @Override
>      public boolean filter(MyEvent myEvent) throws Exception {
>        return myEvent.getPayload() == 'b';
>      }
>    }).within(Time.seconds(1));//.within(Time.milliseconds(2L));
> 
>    StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>    env.getConfig().setAutoWatermarkInterval(1000);
> 
>    DataStream<MyEvent> input =
> env.fromCollection(inputElements).assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<MyEvent>() {
>      private static final long serialVersionUID = -6619787346214245526L;
> 
>      @Override
>      public long extractAscendingTimestamp(MyEvent myEvent) {
>        return myEvent.getTimestamp();
>      }
>    });
> 
>    PatternStream<MyEvent> patternStream = CEP.pattern(input.keyBy(new
> KeySelector<MyEvent, Long>() {
>      private static final long serialVersionUID = 6928745840509494198L;
> 
>      @Override
>      public Long getKey(MyEvent myEvent) throws Exception {
>        return myEvent.getId();
>      }
>    }), pattern);
> 
> 
>    patternStream.select(new PatternTimeoutFunction<MyEvent, String>() {
>      @Override
>      public String timeout(Map<String, MyEvent> map, long l) throws
> Exception {
>        return map.toString() +" @ "+ l;
>      }
> 
>      private static final long serialVersionUID = 300759199619789416L;
> 
> 
>    }, new PatternSelectFunction<MyEvent, String>() {
> 
>      @Override
>      public String select(Map<String, MyEvent> map) throws Exception {
>        return map.toString();
>      }
> 
>      private static final long serialVersionUID = 732172159423132724L;
>    }).print();
> 
> 
> 
> Also along with that now I upgraded my flink maven project to 1.4-Snapshot
> and there seems to be a problem there. 
> 
> According to  this
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html> 
> : 
> 
> class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN,
> OUT> {
>    @Override
>    public OUT select(Map<String, IN> pattern) {
>        IN startEvent = pattern.get("start");
>        IN endEvent = pattern.get("end");
>        return new OUT(startEvent, endEvent);
>    }
> }
> 
> but when I am doing it it expects a list from my side for the events:
> 
> class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN,
> OUT> {
>  @Override
>  public OUT select(Map<String, List&lt;IN>> map) throws Exception {
>    return null;
>  }
> }
> 
> Not really sure what am I doing wrong here, any inputs would be really
> helpful.
> 
> Regards,
> Biplob
> 
> 
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13341.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: No Alerts with FinkCEP

Posted by Biplob Biswas <re...@gmail.com>.
Hello Kostas,

Thanks for the suggestions.

I checked and I am getting my events in the partitionedInput stream when i
am printing it but still nothing on the alert side. I checked flink UI for
backpressure and all seems to be normal (I am having at max 1000 events per
second on the kafka topic so  don't think backpressure could be a problem,
atleast I expect so)

Also, I haven't run my test with my test data as a collection but I tried
with this following example and I did get alerts as a result: 


// CEPTest using collection

List<MyEvent> inputElements = new ArrayList<>();
    inputElements.add(new MyEvent(1, 'a', 1, 1));
    inputElements.add(new MyEvent(1, 'b', 1, 2));
    inputElements.add(new MyEvent(1, 'b', 2, 2));
    inputElements.add(new MyEvent(1, 'b', 3, 5));

    Pattern<MyEvent, ?> pattern = Pattern.<MyEvent>begin("a").where(new
FilterFunction<MyEvent>() {
      private static final long serialVersionUID = 7219646616484327688L;

      @Override
      public boolean filter(MyEvent myEvent) throws Exception {
        return myEvent.getPayload() == 'a';
      }
    }).followedBy("b").where(new FilterFunction<MyEvent>() {
      private static final long serialVersionUID = 7219646616484327688L;

      @Override
      public boolean filter(MyEvent myEvent) throws Exception {
        return myEvent.getPayload() == 'b';
      }
    }).within(Time.seconds(1));//.within(Time.milliseconds(2L));

    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(1000);

    DataStream<MyEvent> input =
env.fromCollection(inputElements).assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<MyEvent>() {
      private static final long serialVersionUID = -6619787346214245526L;

      @Override
      public long extractAscendingTimestamp(MyEvent myEvent) {
        return myEvent.getTimestamp();
      }
    });

    PatternStream<MyEvent> patternStream = CEP.pattern(input.keyBy(new
KeySelector<MyEvent, Long>() {
      private static final long serialVersionUID = 6928745840509494198L;

      @Override
      public Long getKey(MyEvent myEvent) throws Exception {
        return myEvent.getId();
      }
    }), pattern);


    patternStream.select(new PatternTimeoutFunction<MyEvent, String>() {
      @Override
      public String timeout(Map<String, MyEvent> map, long l) throws
Exception {
        return map.toString() +" @ "+ l;
      }

      private static final long serialVersionUID = 300759199619789416L;


    }, new PatternSelectFunction<MyEvent, String>() {

      @Override
      public String select(Map<String, MyEvent> map) throws Exception {
        return map.toString();
      }

      private static final long serialVersionUID = 732172159423132724L;
    }).print();



Also along with that now I upgraded my flink maven project to 1.4-Snapshot
and there seems to be a problem there. 

According to  this
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html> 
: 

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN,
OUT> {
    @Override
    public OUT select(Map<String, IN> pattern) {
        IN startEvent = pattern.get("start");
        IN endEvent = pattern.get("end");
        return new OUT(startEvent, endEvent);
    }
}

but when I am doing it it expects a list from my side for the events:

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN,
OUT> {
  @Override
  public OUT select(Map<String, List&lt;IN>> map) throws Exception {
    return null;
  }
}

Not really sure what am I doing wrong here, any inputs would be really
helpful.

Regards,
Biplob





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13341.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: No Alerts with FinkCEP

Posted by Kostas Kloudas <k....@data-artisans.com>.
One additional comment, from your code it seems you are using Flink 1.2.
It would be worth upgrading to 1.3. The updated CEP library includes a lot of 
new features and bugfixes.

Cheers,
Kostas

> On May 26, 2017, at 3:33 PM, Kostas Kloudas <k....@data-artisans.com> wrote:
> 
> Hi Biplob,
> 
> From a first scan of the code I cannot find sth fishy.
> 
> You are working on ProcessingTime, given that you do not 
> provide any time characteristic specification, right?
> 
> In this case, if you print your partitionedInput stream, do you 
> see elements flowing as expected?
> 
> If elements are flowing normally, is any back pressure created? 
> Or you keep on reading records from kafka uninterrupted? 
> I am asking to see if the CEP operator is doing sth that blocks the 
> pipeline or it just discards the elements.
> 
> It could be also worth trying to add a source with artificial elements env.fromCollection(…) 
> to see if in this case everything works normally.
> 
> Kostas
> 
>> On May 26, 2017, at 2:25 PM, Biplob Biswas <re...@gmail.com> wrote:
>> 
>> Hi,
>> 
>> I just started exploring Flink CEP a day back and I thought I can use it to
>> make a simple event processor. For that I looked into the CEP examples by
>> Till and some other articles. 
>> 
>> Now I have 2 questions which i would like to ask:
>> 
>> *Part 1:*
>> 
>> I came up with the following piece of code, but this is not working as
>> expected.
>> 
>> ///**************** Main ******************///
>> 
>> 
>> FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
>>           "testTopic",
>>           new SimpleStringSchema(),
>>           props);
>> 
>>   DataStream<String> input = env.addSource(consumer);
>>   LOG.info("About to process events");
>>   DataStream<ReadEventType> events =
>>           input
>>                   //.map(s -> s.f1)
>>                   .map(new MapStringToRRE())
>>                   .filter(Objects::nonNull);
>> 
>>   //events.print();
>> 
>>   DataStream<ReadEventType> partitionedInput = events
>>           .keyBy((KeySelector<ReadEventType, String>) value ->
>> value.getRawTransactionItem().getChargedAccount());
>> 
>>   Pattern<ReadEventType, ?> pattern =
>> Pattern.<ReadEventType>begin("first")
>>           .where(event -> event.getFormat() == FormatType.FILE)
>>           .followedBy("second")
>>           .where(event -> event.getFormat() == FormatType.FILE)
>>           .within(Time.seconds(1));
>> 
>>   PatternStream<ReadEventType> patternStream =
>> CEP.pattern(partitionedInput, pattern);
>> 
>>   DataStream<String> alerts =
>> patternStream.select((PatternSelectFunction<ReadEventType, String>)
>> CEPForBAMRRE::createAlert);
>> 
>>   alerts.print();
>> 
>>   env.execute("CEP monitoring job");
>> }
>> 
>> 
>> ///*********** Alert Function returning just concat of txn id
>> ***************///
>> 
>> private static String createAlert(Map<String, ReadEventType> pattern) {
>>   return pattern.get("first").getTransactionItem().getUid() + " " +
>>           pattern.get("second").getTransactionItem().getUid();
>> }
>> 
>> ///******************* properties for kafka **************///
>> 
>> private static Properties getDefaultProperties(Properties prop){
>>   prop.put("group.id", "FlinkCEP");
>>   prop.put("bootstrap.servers", BOOTSTRAP_SERVERS);
>>   prop.put("zookeeper.connect", ZKEEPER);
>>   prop.put("auto.offset.reset", "earliest");
>>   return prop;
>> }
>> 
>> 
>> As my kafka topic only sends me events with formattype = FILE, I was
>> expecting to see multiple alerts being raised. But thats not the case, i am
>> not getting any alert at the moment.
>> 
>> Can anyone point out what am I doing wrong? 
>> 
>> PART 2: 
>> 
>> Also, my main aim for using CEP is to read from different topics and raise
>> alert if a second event is *not* followed by a first event within a given
>> time interval. How can I achieve it with FlinkCEP? for now I can only see
>> that if 2 events follow within a time interval an alert should be raised. 
>> 
>> 
>> Thanks & Regards,
>> Biplob
>> 
>> 
>> 
>> --
>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
> 


Re: No Alerts with FinkCEP

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Biplob,

From a first scan of the code I cannot find sth fishy.

You are working on ProcessingTime, given that you do not 
provide any time characteristic specification, right?

In this case, if you print your partitionedInput stream, do you 
see elements flowing as expected?

If elements are flowing normally, is any back pressure created? 
Or you keep on reading records from kafka uninterrupted? 
I am asking to see if the CEP operator is doing sth that blocks the 
pipeline or it just discards the elements.

It could be also worth trying to add a source with artificial elements env.fromCollection(…) 
to see if in this case everything works normally.

Kostas

> On May 26, 2017, at 2:25 PM, Biplob Biswas <re...@gmail.com> wrote:
> 
> Hi,
> 
> I just started exploring Flink CEP a day back and I thought I can use it to
> make a simple event processor. For that I looked into the CEP examples by
> Till and some other articles. 
> 
> Now I have 2 questions which i would like to ask:
> 
> *Part 1:*
> 
> I came up with the following piece of code, but this is not working as
> expected.
> 
> ///**************** Main ******************///
> 
> 
> FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
>            "testTopic",
>            new SimpleStringSchema(),
>            props);
> 
>    DataStream<String> input = env.addSource(consumer);
>    LOG.info("About to process events");
>    DataStream<ReadEventType> events =
>            input
>                    //.map(s -> s.f1)
>                    .map(new MapStringToRRE())
>                    .filter(Objects::nonNull);
> 
>    //events.print();
> 
>    DataStream<ReadEventType> partitionedInput = events
>            .keyBy((KeySelector<ReadEventType, String>) value ->
> value.getRawTransactionItem().getChargedAccount());
> 
>    Pattern<ReadEventType, ?> pattern =
> Pattern.<ReadEventType>begin("first")
>            .where(event -> event.getFormat() == FormatType.FILE)
>            .followedBy("second")
>            .where(event -> event.getFormat() == FormatType.FILE)
>            .within(Time.seconds(1));
> 
>    PatternStream<ReadEventType> patternStream =
> CEP.pattern(partitionedInput, pattern);
> 
>    DataStream<String> alerts =
> patternStream.select((PatternSelectFunction<ReadEventType, String>)
> CEPForBAMRRE::createAlert);
> 
>    alerts.print();
> 
>    env.execute("CEP monitoring job");
>  }
> 
> 
> ///*********** Alert Function returning just concat of txn id
> ***************///
> 
>  private static String createAlert(Map<String, ReadEventType> pattern) {
>    return pattern.get("first").getTransactionItem().getUid() + " " +
>            pattern.get("second").getTransactionItem().getUid();
>  }
> 
> ///******************* properties for kafka **************///
> 
>  private static Properties getDefaultProperties(Properties prop){
>    prop.put("group.id", "FlinkCEP");
>    prop.put("bootstrap.servers", BOOTSTRAP_SERVERS);
>    prop.put("zookeeper.connect", ZKEEPER);
>    prop.put("auto.offset.reset", "earliest");
>    return prop;
>  }
> 
> 
> As my kafka topic only sends me events with formattype = FILE, I was
> expecting to see multiple alerts being raised. But thats not the case, i am
> not getting any alert at the moment.
> 
> Can anyone point out what am I doing wrong? 
> 
> PART 2: 
> 
> Also, my main aim for using CEP is to read from different topics and raise
> alert if a second event is *not* followed by a first event within a given
> time interval. How can I achieve it with FlinkCEP? for now I can only see
> that if 2 events follow within a time interval an alert should be raised. 
> 
> 
> Thanks & Regards,
> Biplob
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.