You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Björn Hedström <bj...@gmail.com> on 2017/08/04 07:40:18 UTC

CEP with Kafka source

Hi,

I am writing a small application which monitors a couple of directories for
files which are read by Kafka and later consumed by Flink. Flink then
performs some operations on the records (such as extracting the embedded
timestamp) and tries to find a pattern using CEP. Since the data can be out
of order I am using a BoundedOutOfOrdernessTimestampExtractor with the
window allowing for elements to come up to 24 hours late. The
TimeCharacteristic is set to EventTime.

However here is where i run into some issues. I noticed that Flink does not
start to process the data through the defined pattern until the watermark
is greater than the  timestamp of the record. This issue does not appear
when using a text-file directly as a source and disregarding Kafka. In
practice this could mean that a pattern only consisting of two consecutive
datapoints would not be found until another subsequent 22 datapoints are
collected. It seems that I am missing something fundamental here and any
help would be appreciated

I am using a FlinkKafkaConsumer010, Flink 1.3.0, Kafka 0.11.0.0

Best,
Björn

Re: CEP with Kafka source

Posted by Dawid Wysakowicz <wy...@gmail.com>.
Hi Björn,

You are correct that CEP library buffers events until a watermark with a greater timestamp arrives. It is because the order of events in case of CEP is crucial.
Imagine a Pattern like A next B. And sequence a(t=1) c(t=10) b(t=2). If we do not wait until the Watermark and sort the events upon arrival of it, we would not be able to produce proper results.

I don’t know how does your text-file approach looks like, but if it does work differently I would assume you do not work in EventTime.

Regards,
Dawid


> On 4 Aug 2017, at 09:40, Björn Hedström <bj...@gmail.com> wrote:
> 
> Hi,
> 
> I am writing a small application which monitors a couple of directories for
> files which are read by Kafka and later consumed by Flink. Flink then
> performs some operations on the records (such as extracting the embedded
> timestamp) and tries to find a pattern using CEP. Since the data can be out
> of order I am using a BoundedOutOfOrdernessTimestampExtractor with the
> window allowing for elements to come up to 24 hours late. The
> TimeCharacteristic is set to EventTime.
> 
> However here is where i run into some issues. I noticed that Flink does not
> start to process the data through the defined pattern until the watermark
> is greater than the  timestamp of the record. This issue does not appear
> when using a text-file directly as a source and disregarding Kafka. In
> practice this could mean that a pattern only consisting of two consecutive
> datapoints would not be found until another subsequent 22 datapoints are
> collected. It seems that I am missing something fundamental here and any
> help would be appreciated
> 
> I am using a FlinkKafkaConsumer010, Flink 1.3.0, Kafka 0.11.0.0
> 
> Best,
> Björn