You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sonex <al...@gmail.com> on 2017/05/16 16:03:42 UTC

FlinkCEP latency/throughput

Hello everyone,

I am testing some patterns with FlinkCEP and I want to measure latency and
throughput when using 1 or more processing cores. How can I do that ??

What I have done so far:
Latency: Each time an event arrives I store the system time
(System.currentTimeMillis). When flink calls the select function which means
we have a full pattern match, again I take the system time. The difference
of the system time taken from the first event of the complex event and the
system time taken when the function is called is the latency for now.

Throughput: I divide the total number of the events of the dataset by the
time taken to complete the experiment.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-tp13170.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: FlinkCEP latency/throughput

Posted by Dawid Wysakowicz <wy...@gmail.com>.
Hello Alfred,

Just some considerations  from my side as for the latency. I think the
first step should be defining what does "latency" for a CEP library really
means.
The first thing that comes to my mind is the time period between the
arrival of an event that should trigger a match (ending pattern) and actual
time when the match is emitted(for that case a select function is a good
place I think).

I think Kostas was also referring to similar kind of issue.

Hope it will be helpful.

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-05-19 10:59 GMT+02:00 Sonex <al...@gmail.com>:

> Hello Kostas,
>
> thanks for your response. Regarding throughput, it makes sense.
>
> But there is still one question remaining. How can I measure the latency of
> my FlinkCEP application ???
>
> Maybe you answered it, but I didn`t quite get that. As far as your number 2
> question about measuring latency, the answer is yes, the first element in
> the matching pattern will wait inevitably longer than the last one
>
> Thank you for your time!!!
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-
> tp13170p13221.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: FlinkCEP latency/throughput

Posted by Sonex <al...@gmail.com>.
Hello Kostas,

thanks for your response. Regarding throughput, it makes sense.

But there is still one question remaining. How can I measure the latency of
my FlinkCEP application ???

Maybe you answered it, but I didn`t quite get that. As far as your number 2
question about measuring latency, the answer is yes, the first element in
the matching pattern will wait inevitably longer than the last one

Thank you for your time!!!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-tp13170p13221.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: FlinkKafkaConsumer using Kafka-GroupID?

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Valentin!

Your understanding is correct, the Kafka connectors do not use the consumer group functionality to distribute messages across multiple instances of a FlinkKafkaConsumer source. It’s basically determining which instances should be assigned which Kafka partitions based on a simple round-robin distribution.

Is there any chance to run 2 different flink (standalone) apps consuming messages from a single kafka-topic only once? This is what I could do by using 2 native Kafka-Consumers within the same consumer-group.

Therefore, I don’t think this is possible with the FlinkKafkaConsumers. However, this is exactly what Flink’s checkpointing and savepoints is designed for.
If your single app fails, using checkpoints / savepoints the consumer can just re-start from the offsets in that checkpoint / savepoint.
In other words, with Flink’s streaming fault tolerance mechanics, you will get exactly-once guarantees across 2 different runs of the app.
The FlinkKafkaConnector docs should explain this thoroughly [1].

Does this address what your concerns?

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#kafka-consumers-and-fault-tolerance


On 18 May 2017 at 1:35:35 AM, Valentin (valentin@aseno.de) wrote:

Hi there,

As far as I understood, Flink Kafka Connectors don’t use the consumer group management feature from Kafka. Here the post I got the info from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html#none

For some reasons we cannot set up a flink-cluster environment, but we still need to assure high availability. e.g. in case one node goes down the second should still keep on running.


My question:
- Is there any chance to run 2 different flink (standalone) apps consuming messages from a single kafka-topic only once? This is what I could do by using 2 native Kafka-Consumers within the same consumer-group.

Many thanks in advance
Valentin 
 

FlinkKafkaConsumer using Kafka-GroupID?

Posted by Valentin <va...@aseno.de>.
Hi there,

As far as I understood, Flink Kafka Connectors don’t use the consumer group management feature from Kafka. Here the post I got the info from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html#none <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html#none>

For some reasons we cannot set up a flink-cluster environment, but we still need to assure high availability. e.g. in case one node goes down the second should still keep on running.


My question:
- Is there any chance to run 2 different flink (standalone) apps consuming messages from a single kafka-topic only once? This is what I could do by using 2 native Kafka-Consumers within the same consumer-group.

Many thanks in advance
Valentin 
 

Re: FlinkCEP latency/throughput

Posted by Dean Wampler <de...@lightbend.com>.
On Wed, May 17, 2017 at 10:34 AM, Kostas Kloudas <
k.kloudas@data-artisans.com> wrote:

> Hello Alfred,
>
> As a first general remark, Flink was not optimized for multicore
> deployments
> but rather for distributed environments. This implies overheads
> (serialization,
> communication etc), when compared to libs optimized for multicores. So
> there
> may be libraries that are better optimized for those settings if you are
> planning
> to use just a multicore machine.
>
> Now for your suggestion:
>
...

If you're interested in a multi-core option, check out Akka Streams
<http://doc.akka.io/docs/akka/2.4.17/scala/stream/index.html> or perhaps
the underlying Actor Model <http://doc.akka.io/docs/akka/2.4.17/scala.html>
.



-- 
*Dean Wampler, Ph.D.*
VP, Fast Data Engineering

<http://lightbend.com>

dean.wampler@lightbend.com
@deanwampler <http://twitter.com/deanwampler>
https://www.linkedin.com/in/deanwampler
https://github.com/deanwampler

Re: FlinkCEP latency/throughput

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hello Alfred,

As a first general remark, Flink was not optimized for multicore deployments 
but rather for distributed environments. This implies overheads (serialization, 
communication etc), when compared to libs optimized for multicores. So there
may be libraries that are better optimized for those settings if you are planning 
to use just a multicore machine.

Now for your suggestion:

> On May 16, 2017, at 6:03 PM, Sonex <al...@gmail.com> wrote:
> 
> Hello everyone,
> 
> I am testing some patterns with FlinkCEP and I want to measure latency and
> throughput when using 1 or more processing cores. How can I do that ??
> 
> What I have done so far:
> Latency: Each time an event arrives I store the system time
> (System.currentTimeMillis). When flink calls the select function which means
> we have a full pattern match, again I take the system time. The difference
> of the system time taken from the first event of the complex event and the
> system time taken when the function is called is the latency for now.
> 

1) If you are using event time, then you are also accounting for internal buffering and 
ordering of the incoming events.
 
2) I am not sure if measuring the time between the arrival of each element, and when 
its matching pattern is emitted makes much sense. In a long pattern, the first element
in the matching pattern will wait inevitably longer than the last one, right?

> Throughput: I divide the total number of the events of the dataset by the
> time taken to complete the experiment.
> 
> 

For throughput you could create a job with a sink that does nothing and only a CEP pattern
in your job and count the elements read by your source/min. If your source is not the bottleneck
then the CEP part of the pipeline is the dominating factor (given that your sink just discards everything
so it cannot create backpressure).

I hope this helps,
Kostas