You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ajay Krishna <aj...@gmail.com> on 2017/09/28 00:57:01 UTC

Issue with CEP library

Hi,

I've been only working with flink for the past 2 weeks on a project and am
trying using the CEP library on sensor data. I am using flink version
1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running
Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the
flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots

What I observe is the following. The input to Kafka is a json string and
when parsed on the flink side, it looks like this

(101,Sun Sep 24 23:18:53 UTC 2017,complex
event,High,37.75142,-122.39458,12.0,20.0)

I use a Tuple8 to capture the parsed data. The first field is home_id. The
time characteristic is set to EventTime and I have an
AscendingTimestampExtractor using the timestamp field. I have parallelism
for the execution environment is set to 4. I have a rather simple event
that I am trying to capture

DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float,
Float>> cepMapByHomeId = cepMap.keyBy(0);

            //cepMapByHomeId.print();

            Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>,
?> cep1 =

Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
                                            .where(new OverLowThreshold())
                                            .followedBy("end")
                                            .where(new OverHighThreshold());


            PatternStream<Tuple8<Integer, Date, String, String, Float,
Float, Float, Float>> patternStream =
CEP.pattern(cepMapByHomeId.keyBy(0), cep1);


            DataStream<Tuple7<Integer, Date, Date, String, String,
Float, Float>> alerts = patternStream.select(new
PackageCapturedEvents());

The pattern checks if the 7th field in the tuple8 goes over 12 and then
over 16. The output of the pattern is like this

(201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex
event,Non-event,37.75837,-122.41467)

On the Kafka producer side, I am trying send simulated data for around 100
homes, so the home_id would go from 0-100 and the input is keyed by
home_id. I have about 10 partitions in kafka. The producer just loops going
through a csv file with a delay of about 100 ms between 2 rows of the csv
file. The data is exactly the same for all 100 of the csv files except for
home_id and the lat & long information. The timestamp is incremented by a
step of 1 sec. I start multiple processes to simulate data form different
homes.

THE PROBLEM:

Flink completely misses capturing events for a large subset of the input
data. I barely see the events for about 4-5 of the home_id values. I do a
print before applying the pattern and after and I see all home_ids before
and only a tiny subset after. Since the data is exactly the same, I expect
all homeid to be captured and written to my sink which is cassandra in this
case. I've looked through all available docs and examples but cannot seem
to get a fix for the problem.

I would really appreciate some guidance how to understand fix this.


Thank you,

Ajay

Re: Issue with CEP library

Posted by Ajay Krishna <aj...@gmail.com>.
Hi Kostas,

I noticed that you commented on FLINK-7549 and  FLINK-7606. I was
monitoring both these JIRAs.

I was always using time characteristic as event time like you had suggested
but I continue to see patterns not getting detected. Could you help shed
more light on this ? I had shared some code earlier in this thread about
extracting the event time from my incoming stream. Let me know if you need
more information.

Thank you,
Ajay


On Sat, Sep 30, 2017 at 11:18 PM, Kostas Kloudas <
k.kloudas@data-artisans.com> wrote:

> Hi Ajay,
>
> I will definitely have a look to see what is happening.
> And I will keep you posted.
>
> Thanks for investigating it.
>
> Kostas
>
> On Oct 1, 2017, at 12:50 AM, Ajay Krishna <aj...@gmail.com> wrote:
>
> Hi Kostas,
>
> Here is an example of a simple event I am trying to detect. The first and
> last line are the interesting points/events. The CEP library is not able to
> detect something like that.
>
> 4> (96,Sat Sep 30 22:30:25 UTC 2017,complex event,Low,32.781082,-117.
> 01864,12.0,20.0)
>
> 4> (96,Sat Sep 30 22:30:26 UTC 2017,complex event,High,32.781082,-117.
> 01864,0.0235,20.0)
>
> 4> (96,Sat Sep 30 22:30:27 UTC 2017,complex event,High,32.781082,-117.
> 01864,0.02319611,20.0)
>
> 4> (96,Sat Sep 30 22:30:28 UTC 2017,complex event,Medium,32.781082,-117.
> 01864,0.023357224,20.0)
>
> 4> (96,Sat Sep 30 22:30:29 UTC 2017,complex event,Low,32.781082,-117.
> 01864,0.060904443,20.0)
>
> 4> (96,Sat Sep 30 22:30:30 UTC 2017,complex event,Medium,32.781082,-117.
> 01864,0.100115,20.0)
>
> 4> (96,Sat Sep 30 22:30:31 UTC 2017,complex event,High,32.781082,-117.
> 01864,0.12398389,20.0)
>
> 4> (96,Sat Sep 30 22:30:32 UTC 2017,complex event,Medium,32.781082,-117.
> 01864,0.15611167,20.0)
>
> 4> (96,Sat Sep 30 22:30:33 UTC 2017,complex event,Low,32.781082,-117.
> 01864,0.15817556,20.0)
>
> 4> (96,Sat Sep 30 22:30:34 UTC 2017,complex event,Low,32.781082,-117.
> 01864,0.09934334,20.0)
>
> 4> (96,Sat Sep 30 22:30:35 UTC 2017,complex event,High,32.781082,-117.
> 01864,16.0,20.0)
>
>
> Notes about this experiment.
>
> 1. Only one kafka partition and just one topic
>
> 2. Flink env parallelism set to 4 and I am using
> AscendingTimestampExtractor on KafkaSource09.
>
> 3. In the data above, the first element is the id that I use for keyBy
>
> 4. I started 4 Kafka producers in parallel with a random delay between them
>
> 5. Each producer sends 10000 rows from a csv at an average of 18 seconds.
> Of the data from 4 producers, the events for only one was detected.
>
> 6. Looking at the log files, I print on the stream and see all 40000 lines
> where each id is associated with one process number. In the above data 96
> is only associated with 4. In this case there is just one partition in
> Kafka. If I were to increase the number of partitions each id is spread
> across multiple processes.
>
> 7. I had ran another test with a different set of 4 ids just before the
> one I've presented above and I expected to see 148 events for 4 ids and I
> saw all of them being captured. I did not change anything as far as delays
> in the producer.
>
> The behavior is quite arbitrary and I am suspecting the cause is because
> of bugs FLINK-7549 <https://issues.apache.org/jira/browse/FLINK-7549> and
> FLINK-7606 <https://issues.apache.org/jira/browse/FLINK-7606>. Could you
> help understand further.
>
> Best regards,
>
> Ajay
>
>
>
> On Thu, Sep 28, 2017 at 8:39 AM, Kostas Kloudas <
> k.kloudas@data-artisans.com> wrote:
>
>> Hi Ajay,
>>
>> After reading all the data from your source, could you somehow tell your
>> sources to send
>> a watermark of Long.MaxValue (or a high value)??
>>
>> I am asking this, just to see if the problem is that the data is simply
>> buffered inside Flink because
>> there is a problem with the timestamps and the watermarks.
>> You could also see this from the WebUi, but seeing the size of your
>> checkpointed state.
>> If the size increases, it means that something is stored there.
>>
>> I will also have a deeper look.
>>
>> Kostas
>>
>> On Sep 28, 2017, at 5:17 PM, Ajay Krishna <aj...@gmail.com> wrote:
>>
>> Hi Kostas,
>>
>> Thank you for reaching out and for the suggestions. Here are the results
>>
>> 1. Using an env parallelism of 1 performed similar with the additional
>> problem that there was significant lag in the kafka topic
>> 2. I removed the additional keyBy(0) but that did not change anything
>> 3. I also tried only to check for the start only pattern and it was
>> exactly the same where I saw one of the homes going through but 3 others
>> just getting dropped.
>> 4. I also tried slowing down the rate from 5000/second into Kafka to
>> about 1000/second but I see similar results.
>>
>> I was wondering if you had any other solutions to the problem. I am
>> specially concerned about 1 and 3. Is this library under active development
>> ? Is there a JIRA open on this issue and could be open one to track this ?
>>
>>
>> I was trying read on Stackoverlfow and found a user had a very very
>> similar issue in Aug'16. So I also contacted him to discuss the issue and
>> learn't that the pattern of failure was exactly the same.
>>
>> https://stackoverflow.com/questions/38870819/flink-cep-is-no
>> t-deterministic
>>
>>
>> Before I found the above post, I created a post for this issue
>> https://stackoverflow.com/questions/46458873/flink-cep-not-
>> recognizing-pattern
>>
>>
>>
>> I would really appreciate your guidance on this.
>>
>> Best regards,
>> Ajay
>>
>>
>>
>>
>>
>> On Thu, Sep 28, 2017 at 1:38 AM, Kostas Kloudas <
>> k.kloudas@data-artisans.com> wrote:
>>
>>> Hi Ajay,
>>>
>>> I will look a bit more on the issue.
>>>
>>> But in the meantime, could you run your job with parallelism of 1, to
>>> see if the results are the expected?
>>>
>>> Also could you change the pattern, for example check only for the start,
>>> to see if all keys pass through.
>>>
>>> As for the code, you apply keyBy(0) the cepMap stream twice, which is
>>> redundant and introduces latency.
>>> You could remove that to also see the impact.
>>>
>>> Kostas
>>>
>>> On Sep 28, 2017, at 2:57 AM, Ajay Krishna <aj...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I've been only working with flink for the past 2 weeks on a project and
>>> am trying using the CEP library on sensor data. I am using flink version
>>> 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running
>>> Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the
>>> flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots
>>>
>>> What I observe is the following. The input to Kafka is a json string and
>>> when parsed on the flink side, it looks like this
>>>
>>> (101,Sun Sep 24 23:18:53 UTC 2017,complex event,High,37.75142,-122.39458,12.0,20.0)
>>>
>>> I use a Tuple8 to capture the parsed data. The first field is home_id.
>>> The time characteristic is set to EventTime and I have an
>>> AscendingTimestampExtractor using the timestamp field. I have parallelism
>>> for the execution environment is set to 4. I have a rather simple event
>>> that I am trying to capture
>>>
>>> DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> cepMapByHomeId = cepMap.keyBy(0);
>>>
>>>             //cepMapByHomeId.print();
>>>
>>>             Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>, ?> cep1 =
>>>                             Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
>>>                                             .where(new OverLowThreshold())
>>>                                             .followedBy("end")
>>>                                             .where(new OverHighThreshold());
>>>
>>>
>>>             PatternStream<Tuple8<Integer, Date, String, String, Float, Float, Float, Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), cep1);
>>>
>>>
>>>             DataStream<Tuple7<Integer, Date, Date, String, String, Float, Float>> alerts = patternStream.select(new PackageCapturedEvents());
>>>
>>> The pattern checks if the 7th field in the tuple8 goes over 12 and then
>>> over 16. The output of the pattern is like this
>>>
>>> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex event,Non-event,37.75837,-122.41467)
>>>
>>> On the Kafka producer side, I am trying send simulated data for around
>>> 100 homes, so the home_id would go from 0-100 and the input is keyed by
>>> home_id. I have about 10 partitions in kafka. The producer just loops going
>>> through a csv file with a delay of about 100 ms between 2 rows of the csv
>>> file. The data is exactly the same for all 100 of the csv files except for
>>> home_id and the lat & long information. The timestamp is incremented by a
>>> step of 1 sec. I start multiple processes to simulate data form different
>>> homes.
>>>
>>> THE PROBLEM:
>>>
>>> Flink completely misses capturing events for a large subset of the input
>>> data. I barely see the events for about 4-5 of the home_id values. I do a
>>> print before applying the pattern and after and I see all home_ids before
>>> and only a tiny subset after. Since the data is exactly the same, I expect
>>> all homeid to be captured and written to my sink which is cassandra in this
>>> case. I've looked through all available docs and examples but cannot seem
>>> to get a fix for the problem.
>>>
>>> I would really appreciate some guidance how to understand fix this.
>>>
>>>
>>> Thank you,
>>>
>>> Ajay
>>>
>>>
>>>
>>
>>
>
>

Re: Issue with CEP library

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Ajay,

I will definitely have a look to see what is happening.
And I will keep you posted.

Thanks for investigating it.

Kostas

> On Oct 1, 2017, at 12:50 AM, Ajay Krishna <aj...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> Here is an example of a simple event I am trying to detect. The first and last line are the interesting points/events. The CEP library is not able to detect something like that. 
> 
> 4> (96,Sat Sep 30 22:30:25 UTC 2017,complex event,Low,32.781082,-117.01864,12.0,20.0)
> 
> 4> (96,Sat Sep 30 22:30:26 UTC 2017,complex event,High,32.781082,-117.01864,0.0235,20.0)
> 
> 4> (96,Sat Sep 30 22:30:27 UTC 2017,complex event,High,32.781082,-117.01864,0.02319611,20.0)
> 
> 4> (96,Sat Sep 30 22:30:28 UTC 2017,complex event,Medium,32.781082,-117.01864,0.023357224,20.0)
> 
> 4> (96,Sat Sep 30 22:30:29 UTC 2017,complex event,Low,32.781082,-117.01864,0.060904443,20.0)
> 
> 4> (96,Sat Sep 30 22:30:30 UTC 2017,complex event,Medium,32.781082,-117.01864,0.100115,20.0)
> 
> 4> (96,Sat Sep 30 22:30:31 UTC 2017,complex event,High,32.781082,-117.01864,0.12398389,20.0)
> 
> 4> (96,Sat Sep 30 22:30:32 UTC 2017,complex event,Medium,32.781082,-117.01864,0.15611167,20.0)
> 
> 4> (96,Sat Sep 30 22:30:33 UTC 2017,complex event,Low,32.781082,-117.01864,0.15817556,20.0)
> 
> 4> (96,Sat Sep 30 22:30:34 UTC 2017,complex event,Low,32.781082,-117.01864,0.09934334,20.0)
> 
> 4> (96,Sat Sep 30 22:30:35 UTC 2017,complex event,High,32.781082,-117.01864,16.0,20.0)
> 
> 
> 
> Notes about this experiment.
> 
> 1. Only one kafka partition and just one topic
> 
> 2. Flink env parallelism set to 4 and I am using AscendingTimestampExtractor on KafkaSource09.
> 
> 3. In the data above, the first element is the id that I use for keyBy
> 
> 4. I started 4 Kafka producers in parallel with a random delay between them
> 
> 5. Each producer sends 10000 rows from a csv at an average of 18 seconds. Of the data from 4 producers, the events for only one was detected. 
> 
> 6. Looking at the log files, I print on the stream and see all 40000 lines where each id is associated with one process number. In the above data 96 is only associated with 4. In this case there is just one partition in Kafka. If I were to increase the number of partitions each id is spread across multiple processes.
> 
> 7. I had ran another test with a different set of 4 ids just before the one I've presented above and I expected to see 148 events for 4 ids and I saw all of them being captured. I did not change anything as far as delays in the producer.
> 
> The behavior is quite arbitrary and I am suspecting the cause is because of bugs FLINK-7549 <https://issues.apache.org/jira/browse/FLINK-7549> and FLINK-7606 <https://issues.apache.org/jira/browse/FLINK-7606>. Could you help understand further.
> 
> Best regards,
> 
> Ajay
> 
> 
> 
> 
> On Thu, Sep 28, 2017 at 8:39 AM, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi Ajay,
> 
> After reading all the data from your source, could you somehow tell your sources to send 
> a watermark of Long.MaxValue (or a high value)??
> 
> I am asking this, just to see if the problem is that the data is simply buffered inside Flink because
> there is a problem with the timestamps and the watermarks.
> You could also see this from the WebUi, but seeing the size of your checkpointed state.
> If the size increases, it means that something is stored there.
> 
> I will also have a deeper look.
> 
> Kostas
> 
>> On Sep 28, 2017, at 5:17 PM, Ajay Krishna <ajaykrishna@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Kostas,
>> 
>> Thank you for reaching out and for the suggestions. Here are the results
>> 
>> 1. Using an env parallelism of 1 performed similar with the additional problem that there was significant lag in the kafka topic
>> 2. I removed the additional keyBy(0) but that did not change anything
>> 3. I also tried only to check for the start only pattern and it was exactly the same where I saw one of the homes going through but 3 others just getting dropped. 
>> 4. I also tried slowing down the rate from 5000/second into Kafka to about 1000/second but I see similar results. 
>> 
>> I was wondering if you had any other solutions to the problem. I am specially concerned about 1 and 3. Is this library under active development ? Is there a JIRA open on this issue and could be open one to track this ? 
>> 
>> 
>> I was trying read on Stackoverlfow and found a user had a very very similar issue in Aug'16. So I also contacted him to discuss the issue and learn't that the pattern of failure was exactly the same. 
>> 
>> https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic <https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic>
>> 
>> 
>> Before I found the above post, I created a post for this issue
>> https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern <https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern>
>> 
>> 
>> 
>> I would really appreciate your guidance on this. 
>> 
>> Best regards,
>> Ajay
>> 
>> 
>> 
>> 
>> 
>> On Thu, Sep 28, 2017 at 1:38 AM, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>> Hi Ajay,
>> 
>> I will look a bit more on the issue.
>> 
>> But in the meantime, could you run your job with parallelism of 1, to see if the results are the expected?
>> 
>> Also could you change the pattern, for example check only for the start, to see if all keys pass through.
>> 
>> As for the code, you apply keyBy(0) the cepMap stream twice, which is redundant and introduces latency. 
>> You could remove that to also see the impact.
>> 
>> Kostas
>> 
>>> On Sep 28, 2017, at 2:57 AM, Ajay Krishna <ajaykrishna@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi, 
>>> 
>>> I've been only working with flink for the past 2 weeks on a project and am trying using the CEP library on sensor data. I am using flink version 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots
>>> 
>>> What I observe is the following. The input to Kafka is a json string and when parsed on the flink side, it looks like this
>>> 
>>> (101,Sun Sep 24 23:18:53 UTC 2017,complex event,High,37.75142,-122.39458,12.0,20.0)
>>> I use a Tuple8 to capture the parsed data. The first field is home_id. The time characteristic is set to EventTime and I have an  AscendingTimestampExtractor using the timestamp field. I have parallelism for the execution environment is set to 4. I have a rather simple event that I am trying to capture
>>> 
>>> DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> cepMapByHomeId = cepMap.keyBy(0);
>>> 
>>>             //cepMapByHomeId.print();
>>> 
>>>             Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>, ?> cep1 =
>>>                             Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
>>>                                             .where(new OverLowThreshold())
>>>                                             .followedBy("end")
>>>                                             .where(new OverHighThreshold());
>>> 
>>> 
>>>             PatternStream<Tuple8<Integer, Date, String, String, Float, Float, Float, Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), cep1);
>>> 
>>> 
>>>             DataStream<Tuple7<Integer, Date, Date, String, String, Float, Float>> alerts = patternStream.select(new PackageCapturedEvents());
>>> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 16. The output of the pattern is like this
>>> 
>>> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex event,Non-event,37.75837,-122.41467)
>>> On the Kafka producer side, I am trying send simulated data for around 100 homes, so the home_id would go from 0-100 and the input is keyed by home_id. I have about 10 partitions in kafka. The producer just loops going through a csv file with a delay of about 100 ms between 2 rows of the csv file. The data is exactly the same for all 100 of the csv files except for home_id and the lat & long information. The timestamp is incremented by a step of 1 sec. I start multiple processes to simulate data form different homes.
>>> 
>>> THE PROBLEM:
>>> 
>>> Flink completely misses capturing events for a large subset of the input data. I barely see the events for about 4-5 of the home_id values. I do a print before applying the pattern and after and I see all home_ids before and only a tiny subset after. Since the data is exactly the same, I expect all homeid to be captured and written to my sink which is cassandra in this case. I've looked through all available docs and examples but cannot seem to get a fix for the problem.
>>> 
>>> I would really appreciate some guidance how to understand fix this.
>>> 
>>> 
>>> 
>>> Thank you,
>>> 
>>> Ajay
>>> 
>> 
>> 
> 
> 


Re: Issue with CEP library

Posted by Ajay Krishna <aj...@gmail.com>.
Hi Kostas,

Here is an example of a simple event I am trying to detect. The first and
last line are the interesting points/events. The CEP library is not able to
detect something like that.

4> (96,Sat Sep 30 22:30:25 UTC 2017,complex
event,Low,32.781082,-117.01864,12.0,20.0)

4> (96,Sat Sep 30 22:30:26 UTC 2017,complex
event,High,32.781082,-117.01864,0.0235,20.0)

4> (96,Sat Sep 30 22:30:27 UTC 2017,complex
event,High,32.781082,-117.01864,0.02319611,20.0)

4> (96,Sat Sep 30 22:30:28 UTC 2017,complex
event,Medium,32.781082,-117.01864,0.023357224,20.0)

4> (96,Sat Sep 30 22:30:29 UTC 2017,complex
event,Low,32.781082,-117.01864,0.060904443,20.0)

4> (96,Sat Sep 30 22:30:30 UTC 2017,complex
event,Medium,32.781082,-117.01864,0.100115,20.0)

4> (96,Sat Sep 30 22:30:31 UTC 2017,complex
event,High,32.781082,-117.01864,0.12398389,20.0)

4> (96,Sat Sep 30 22:30:32 UTC 2017,complex
event,Medium,32.781082,-117.01864,0.15611167,20.0)

4> (96,Sat Sep 30 22:30:33 UTC 2017,complex
event,Low,32.781082,-117.01864,0.15817556,20.0)

4> (96,Sat Sep 30 22:30:34 UTC 2017,complex
event,Low,32.781082,-117.01864,0.09934334,20.0)

4> (96,Sat Sep 30 22:30:35 UTC 2017,complex
event,High,32.781082,-117.01864,16.0,20.0)


Notes about this experiment.

1. Only one kafka partition and just one topic

2. Flink env parallelism set to 4 and I am using
AscendingTimestampExtractor on KafkaSource09.

3. In the data above, the first element is the id that I use for keyBy

4. I started 4 Kafka producers in parallel with a random delay between them

5. Each producer sends 10000 rows from a csv at an average of 18 seconds.
Of the data from 4 producers, the events for only one was detected.

6. Looking at the log files, I print on the stream and see all 40000 lines
where each id is associated with one process number. In the above data 96
is only associated with 4. In this case there is just one partition in
Kafka. If I were to increase the number of partitions each id is spread
across multiple processes.

7. I had ran another test with a different set of 4 ids just before the one
I've presented above and I expected to see 148 events for 4 ids and I saw
all of them being captured. I did not change anything as far as delays in
the producer.

The behavior is quite arbitrary and I am suspecting the cause is because of
bugs FLINK-7549 <https://issues.apache.org/jira/browse/FLINK-7549> and
FLINK-7606 <https://issues.apache.org/jira/browse/FLINK-7606>. Could you
help understand further.

Best regards,

Ajay



On Thu, Sep 28, 2017 at 8:39 AM, Kostas Kloudas <k.kloudas@data-artisans.com
> wrote:

> Hi Ajay,
>
> After reading all the data from your source, could you somehow tell your
> sources to send
> a watermark of Long.MaxValue (or a high value)??
>
> I am asking this, just to see if the problem is that the data is simply
> buffered inside Flink because
> there is a problem with the timestamps and the watermarks.
> You could also see this from the WebUi, but seeing the size of your
> checkpointed state.
> If the size increases, it means that something is stored there.
>
> I will also have a deeper look.
>
> Kostas
>
> On Sep 28, 2017, at 5:17 PM, Ajay Krishna <aj...@gmail.com> wrote:
>
> Hi Kostas,
>
> Thank you for reaching out and for the suggestions. Here are the results
>
> 1. Using an env parallelism of 1 performed similar with the additional
> problem that there was significant lag in the kafka topic
> 2. I removed the additional keyBy(0) but that did not change anything
> 3. I also tried only to check for the start only pattern and it was
> exactly the same where I saw one of the homes going through but 3 others
> just getting dropped.
> 4. I also tried slowing down the rate from 5000/second into Kafka to about
> 1000/second but I see similar results.
>
> I was wondering if you had any other solutions to the problem. I am
> specially concerned about 1 and 3. Is this library under active development
> ? Is there a JIRA open on this issue and could be open one to track this ?
>
>
> I was trying read on Stackoverlfow and found a user had a very very
> similar issue in Aug'16. So I also contacted him to discuss the issue and
> learn't that the pattern of failure was exactly the same.
>
> https://stackoverflow.com/questions/38870819/flink-cep-is-
> not-deterministic
>
>
> Before I found the above post, I created a post for this issue
> https://stackoverflow.com/questions/46458873/flink-cep-
> not-recognizing-pattern
>
>
>
> I would really appreciate your guidance on this.
>
> Best regards,
> Ajay
>
>
>
>
>
> On Thu, Sep 28, 2017 at 1:38 AM, Kostas Kloudas <
> k.kloudas@data-artisans.com> wrote:
>
>> Hi Ajay,
>>
>> I will look a bit more on the issue.
>>
>> But in the meantime, could you run your job with parallelism of 1, to see
>> if the results are the expected?
>>
>> Also could you change the pattern, for example check only for the start,
>> to see if all keys pass through.
>>
>> As for the code, you apply keyBy(0) the cepMap stream twice, which is
>> redundant and introduces latency.
>> You could remove that to also see the impact.
>>
>> Kostas
>>
>> On Sep 28, 2017, at 2:57 AM, Ajay Krishna <aj...@gmail.com> wrote:
>>
>> Hi,
>>
>> I've been only working with flink for the past 2 weeks on a project and
>> am trying using the CEP library on sensor data. I am using flink version
>> 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running
>> Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the
>> flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots
>>
>> What I observe is the following. The input to Kafka is a json string and
>> when parsed on the flink side, it looks like this
>>
>> (101,Sun Sep 24 23:18:53 UTC 2017,complex event,High,37.75142,-122.39458,12.0,20.0)
>>
>> I use a Tuple8 to capture the parsed data. The first field is home_id.
>> The time characteristic is set to EventTime and I have an
>> AscendingTimestampExtractor using the timestamp field. I have parallelism
>> for the execution environment is set to 4. I have a rather simple event
>> that I am trying to capture
>>
>> DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> cepMapByHomeId = cepMap.keyBy(0);
>>
>>             //cepMapByHomeId.print();
>>
>>             Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>, ?> cep1 =
>>                             Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
>>                                             .where(new OverLowThreshold())
>>                                             .followedBy("end")
>>                                             .where(new OverHighThreshold());
>>
>>
>>             PatternStream<Tuple8<Integer, Date, String, String, Float, Float, Float, Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), cep1);
>>
>>
>>             DataStream<Tuple7<Integer, Date, Date, String, String, Float, Float>> alerts = patternStream.select(new PackageCapturedEvents());
>>
>> The pattern checks if the 7th field in the tuple8 goes over 12 and then
>> over 16. The output of the pattern is like this
>>
>> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex event,Non-event,37.75837,-122.41467)
>>
>> On the Kafka producer side, I am trying send simulated data for around
>> 100 homes, so the home_id would go from 0-100 and the input is keyed by
>> home_id. I have about 10 partitions in kafka. The producer just loops going
>> through a csv file with a delay of about 100 ms between 2 rows of the csv
>> file. The data is exactly the same for all 100 of the csv files except for
>> home_id and the lat & long information. The timestamp is incremented by a
>> step of 1 sec. I start multiple processes to simulate data form different
>> homes.
>>
>> THE PROBLEM:
>>
>> Flink completely misses capturing events for a large subset of the input
>> data. I barely see the events for about 4-5 of the home_id values. I do a
>> print before applying the pattern and after and I see all home_ids before
>> and only a tiny subset after. Since the data is exactly the same, I expect
>> all homeid to be captured and written to my sink which is cassandra in this
>> case. I've looked through all available docs and examples but cannot seem
>> to get a fix for the problem.
>>
>> I would really appreciate some guidance how to understand fix this.
>>
>>
>> Thank you,
>>
>> Ajay
>>
>>
>>
>
>

Re: Issue with CEP library

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Ajay,

After reading all the data from your source, could you somehow tell your sources to send 
a watermark of Long.MaxValue (or a high value)??

I am asking this, just to see if the problem is that the data is simply buffered inside Flink because
there is a problem with the timestamps and the watermarks.
You could also see this from the WebUi, but seeing the size of your checkpointed state.
If the size increases, it means that something is stored there.

I will also have a deeper look.

Kostas

> On Sep 28, 2017, at 5:17 PM, Ajay Krishna <aj...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> Thank you for reaching out and for the suggestions. Here are the results
> 
> 1. Using an env parallelism of 1 performed similar with the additional problem that there was significant lag in the kafka topic
> 2. I removed the additional keyBy(0) but that did not change anything
> 3. I also tried only to check for the start only pattern and it was exactly the same where I saw one of the homes going through but 3 others just getting dropped. 
> 4. I also tried slowing down the rate from 5000/second into Kafka to about 1000/second but I see similar results. 
> 
> I was wondering if you had any other solutions to the problem. I am specially concerned about 1 and 3. Is this library under active development ? Is there a JIRA open on this issue and could be open one to track this ? 
> 
> 
> I was trying read on Stackoverlfow and found a user had a very very similar issue in Aug'16. So I also contacted him to discuss the issue and learn't that the pattern of failure was exactly the same. 
> 
> https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic <https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic>
> 
> 
> Before I found the above post, I created a post for this issue
> https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern <https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern>
> 
> 
> 
> I would really appreciate your guidance on this. 
> 
> Best regards,
> Ajay
> 
> 
> 
> 
> 
> On Thu, Sep 28, 2017 at 1:38 AM, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi Ajay,
> 
> I will look a bit more on the issue.
> 
> But in the meantime, could you run your job with parallelism of 1, to see if the results are the expected?
> 
> Also could you change the pattern, for example check only for the start, to see if all keys pass through.
> 
> As for the code, you apply keyBy(0) the cepMap stream twice, which is redundant and introduces latency. 
> You could remove that to also see the impact.
> 
> Kostas
> 
>> On Sep 28, 2017, at 2:57 AM, Ajay Krishna <ajaykrishna@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi, 
>> 
>> I've been only working with flink for the past 2 weeks on a project and am trying using the CEP library on sensor data. I am using flink version 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots
>> 
>> What I observe is the following. The input to Kafka is a json string and when parsed on the flink side, it looks like this
>> 
>> (101,Sun Sep 24 23:18:53 UTC 2017,complex event,High,37.75142,-122.39458,12.0,20.0)
>> I use a Tuple8 to capture the parsed data. The first field is home_id. The time characteristic is set to EventTime and I have an AscendingTimestampExtractor using the timestamp field. I have parallelism for the execution environment is set to 4. I have a rather simple event that I am trying to capture
>> 
>> DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> cepMapByHomeId = cepMap.keyBy(0);
>> 
>>             //cepMapByHomeId.print();
>> 
>>             Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>, ?> cep1 =
>>                             Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
>>                                             .where(new OverLowThreshold())
>>                                             .followedBy("end")
>>                                             .where(new OverHighThreshold());
>> 
>> 
>>             PatternStream<Tuple8<Integer, Date, String, String, Float, Float, Float, Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), cep1);
>> 
>> 
>>             DataStream<Tuple7<Integer, Date, Date, String, String, Float, Float>> alerts = patternStream.select(new PackageCapturedEvents());
>> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 16. The output of the pattern is like this
>> 
>> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex event,Non-event,37.75837,-122.41467)
>> On the Kafka producer side, I am trying send simulated data for around 100 homes, so the home_id would go from 0-100 and the input is keyed by home_id. I have about 10 partitions in kafka. The producer just loops going through a csv file with a delay of about 100 ms between 2 rows of the csv file. The data is exactly the same for all 100 of the csv files except for home_id and the lat & long information. The timestamp is incremented by a step of 1 sec. I start multiple processes to simulate data form different homes.
>> 
>> THE PROBLEM:
>> 
>> Flink completely misses capturing events for a large subset of the input data. I barely see the events for about 4-5 of the home_id values. I do a print before applying the pattern and after and I see all home_ids before and only a tiny subset after. Since the data is exactly the same, I expect all homeid to be captured and written to my sink which is cassandra in this case. I've looked through all available docs and examples but cannot seem to get a fix for the problem.
>> 
>> I would really appreciate some guidance how to understand fix this.
>> 
>> 
>> 
>> Thank you,
>> 
>> Ajay
>> 
> 
> 


Re: Issue with CEP library

Posted by Ajay Krishna <aj...@gmail.com>.
Hi Kostas,

Thank you for reaching out and for the suggestions. Here are the results

1. Using an env parallelism of 1 performed similar with the additional
problem that there was significant lag in the kafka topic
2. I removed the additional keyBy(0) but that did not change anything
3. I also tried only to check for the start only pattern and it was exactly
the same where I saw one of the homes going through but 3 others just
getting dropped.
4. I also tried slowing down the rate from 5000/second into Kafka to about
1000/second but I see similar results.

I was wondering if you had any other solutions to the problem. I am
specially concerned about 1 and 3. Is this library under active development
? Is there a JIRA open on this issue and could be open one to track this ?


I was trying read on Stackoverlfow and found a user had a very very similar
issue in Aug'16. So I also contacted him to discuss the issue and learn't
that the pattern of failure was exactly the same.

https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic


Before I found the above post, I created a post for this issue
https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern



I would really appreciate your guidance on this.

Best regards,
Ajay





On Thu, Sep 28, 2017 at 1:38 AM, Kostas Kloudas <k.kloudas@data-artisans.com
> wrote:

> Hi Ajay,
>
> I will look a bit more on the issue.
>
> But in the meantime, could you run your job with parallelism of 1, to see
> if the results are the expected?
>
> Also could you change the pattern, for example check only for the start,
> to see if all keys pass through.
>
> As for the code, you apply keyBy(0) the cepMap stream twice, which is
> redundant and introduces latency.
> You could remove that to also see the impact.
>
> Kostas
>
> On Sep 28, 2017, at 2:57 AM, Ajay Krishna <aj...@gmail.com> wrote:
>
> Hi,
>
> I've been only working with flink for the past 2 weeks on a project and am
> trying using the CEP library on sensor data. I am using flink version
> 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running
> Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the
> flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots
>
> What I observe is the following. The input to Kafka is a json string and
> when parsed on the flink side, it looks like this
>
> (101,Sun Sep 24 23:18:53 UTC 2017,complex event,High,37.75142,-122.39458,12.0,20.0)
>
> I use a Tuple8 to capture the parsed data. The first field is home_id. The
> time characteristic is set to EventTime and I have an
> AscendingTimestampExtractor using the timestamp field. I have parallelism
> for the execution environment is set to 4. I have a rather simple event
> that I am trying to capture
>
> DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> cepMapByHomeId = cepMap.keyBy(0);
>
>             //cepMapByHomeId.print();
>
>             Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>, ?> cep1 =
>                             Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
>                                             .where(new OverLowThreshold())
>                                             .followedBy("end")
>                                             .where(new OverHighThreshold());
>
>
>             PatternStream<Tuple8<Integer, Date, String, String, Float, Float, Float, Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), cep1);
>
>
>             DataStream<Tuple7<Integer, Date, Date, String, String, Float, Float>> alerts = patternStream.select(new PackageCapturedEvents());
>
> The pattern checks if the 7th field in the tuple8 goes over 12 and then
> over 16. The output of the pattern is like this
>
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex event,Non-event,37.75837,-122.41467)
>
> On the Kafka producer side, I am trying send simulated data for around 100
> homes, so the home_id would go from 0-100 and the input is keyed by
> home_id. I have about 10 partitions in kafka. The producer just loops going
> through a csv file with a delay of about 100 ms between 2 rows of the csv
> file. The data is exactly the same for all 100 of the csv files except for
> home_id and the lat & long information. The timestamp is incremented by a
> step of 1 sec. I start multiple processes to simulate data form different
> homes.
>
> THE PROBLEM:
>
> Flink completely misses capturing events for a large subset of the input
> data. I barely see the events for about 4-5 of the home_id values. I do a
> print before applying the pattern and after and I see all home_ids before
> and only a tiny subset after. Since the data is exactly the same, I expect
> all homeid to be captured and written to my sink which is cassandra in this
> case. I've looked through all available docs and examples but cannot seem
> to get a fix for the problem.
>
> I would really appreciate some guidance how to understand fix this.
>
>
> Thank you,
>
> Ajay
>
>
>

Re: Issue with CEP library

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Ajay,

I will look a bit more on the issue.

But in the meantime, could you run your job with parallelism of 1, to see if the results are the expected?

Also could you change the pattern, for example check only for the start, to see if all keys pass through.

As for the code, you apply keyBy(0) the cepMap stream twice, which is redundant and introduces latency. 
You could remove that to also see the impact.

Kostas

> On Sep 28, 2017, at 2:57 AM, Ajay Krishna <aj...@gmail.com> wrote:
> 
> Hi, 
> 
> I've been only working with flink for the past 2 weeks on a project and am trying using the CEP library on sensor data. I am using flink version 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots
> 
> What I observe is the following. The input to Kafka is a json string and when parsed on the flink side, it looks like this
> 
> (101,Sun Sep 24 23:18:53 UTC 2017,complex event,High,37.75142,-122.39458,12.0,20.0)
> I use a Tuple8 to capture the parsed data. The first field is home_id. The time characteristic is set to EventTime and I have an AscendingTimestampExtractor using the timestamp field. I have parallelism for the execution environment is set to 4. I have a rather simple event that I am trying to capture
> 
> DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> cepMapByHomeId = cepMap.keyBy(0);
> 
>             //cepMapByHomeId.print();
> 
>             Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>, ?> cep1 =
>                             Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
>                                             .where(new OverLowThreshold())
>                                             .followedBy("end")
>                                             .where(new OverHighThreshold());
> 
> 
>             PatternStream<Tuple8<Integer, Date, String, String, Float, Float, Float, Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), cep1);
> 
> 
>             DataStream<Tuple7<Integer, Date, Date, String, String, Float, Float>> alerts = patternStream.select(new PackageCapturedEvents());
> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 16. The output of the pattern is like this
> 
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex event,Non-event,37.75837,-122.41467)
> On the Kafka producer side, I am trying send simulated data for around 100 homes, so the home_id would go from 0-100 and the input is keyed by home_id. I have about 10 partitions in kafka. The producer just loops going through a csv file with a delay of about 100 ms between 2 rows of the csv file. The data is exactly the same for all 100 of the csv files except for home_id and the lat & long information. The timestamp is incremented by a step of 1 sec. I start multiple processes to simulate data form different homes.
> 
> THE PROBLEM:
> 
> Flink completely misses capturing events for a large subset of the input data. I barely see the events for about 4-5 of the home_id values. I do a print before applying the pattern and after and I see all home_ids before and only a tiny subset after. Since the data is exactly the same, I expect all homeid to be captured and written to my sink which is cassandra in this case. I've looked through all available docs and examples but cannot seem to get a fix for the problem.
> 
> I would really appreciate some guidance how to understand fix this.
> 
> 
> 
> Thank you,
> 
> Ajay
>