You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dromit <dr...@gmail.com> on 2016/10/31 02:37:18 UTC

Kafka + Flink, Basic Questions

Hello,

I'm writing an application to generate reports of a real time stream of
sale records (product, quantity, price, time) that I get from a websocket,
and I plan to use Kafka and Flink to do so. The result of the pipeline
should be, for instance, "the average price for a certain product in the
last 3 hours, minute by minute", "the volume (sum of quantities) for a
certain product in the last hour, second by second", etc.

I want to balance the load on a cluster given that I'm receiving way too
many records to even keep them in memory in a single computer, around
10,000 each second.  I intend to distribute them on a Kafka topic "sales",
with multiple partitions (one for each product). That way all the records
for a given product are kept on the same node but different products are
processed on different nodes.

There are some things I still don't get about how Kafka and Flink are
supposed to work together. The following questions may be rather general
and that's because I've no experience with either of them, so any
additional insight or advice you can provide is very welcome.

*1. Architecture*

Keeping all records for a given product on the same node as mentioned
should lower the latency since all consumers will handle only local data.
That works in theory only, because if I'm not wrong Flink executes the
operators over its own cluster of nodes.

How does Flink know where to process what? What happens if there are more
nodes running Flink than Kafka, or the other way around? In case I have one
pair of Kafka and Flink node on each node on the cluster (AWS), how am I
sure that nodes get the correct data (ie, only local data)?

*2. Performance*

In the case of computing the volume of a product in a sliding window and
provided the quantities are 10, 3, 6, 1, 0, 15, .... For the first window I
may need to compute 10+3+6+1, for the second 3+6+1+0, for the third
6+1+0+15, and so on. The problem here is that in each case I compute some
of the sums multiple times (6+1 for instance is computed 3 times), and it
is even worse considering a window may have thousands of records and that
some operations are not that simple (standard deviation for instance).

Is there a way to reuse the result of previous operations to avoid this
problem? Any performance improvement to apply on this cases?

*3. Topology*

The pipeline of operations to get all the information for a report is
really big. It may require an average of prices, volume, standard
deviation, etc. But some of these operations can be executed concurrently.

How do I define the workflow (topology) so that certain "steps" are
executed concurrently, while others wait for all the previous steps to be
completed before proceeding?

*4. Launch*

The producer and all the consumers have in common many Java classes, so for
simplicity I intend to create and launch them from a single
application/process, maybe creating one thread for each one if required.

Is there any problem with that? Any advantage of creating an independent
application for each producer and consumer as shown in the documentation?

-

Best regards,
Matt

Re: Kafka + Flink, Basic Questions

Posted by Robert Metzger <rm...@apache.org>.
Hi Matt,

This is a fairly extensive question. I'll try to answer all of them, but I
don't have the time right now to extensively discuss the architecture of
your application. Maybe there's some other person on the ML who can extend
my answers.

(Answers in-line below)

On Mon, Oct 31, 2016 at 3:37 AM, Dromit <dr...@gmail.com> wrote:

> Hello,
>
> I'm writing an application to generate reports of a real time stream of
> sale records (product, quantity, price, time) that I get from a websocket,
> and I plan to use Kafka and Flink to do so. The result of the pipeline
> should be, for instance, "the average price for a certain product in the
> last 3 hours, minute by minute", "the volume (sum of quantities) for a
> certain product in the last hour, second by second", etc.
>
> I want to balance the load on a cluster given that I'm receiving way too
> many records to even keep them in memory in a single computer, around
> 10,000 each second.  I intend to distribute them on a Kafka topic "sales",
> with multiple partitions (one for each product). That way all the records
> for a given product are kept on the same node but different products are
> processed on different nodes.
>

The data for such windows doesn't need to fit into memory. With the RocksDB
state backend, your limit is the available disk space on each machine.
What size does each individual record have? (I'm asking to get a feeling
for the load you are putting on Flink ... I think 10k records / sec isn't
that much).

As long as the products have a similar number of messages, you won't run
into any load balancing issues.
I would recommend not to over-optimize this on the first implementation.
The bottleneck can very well be somewhere else in the end.


>
> There are some things I still don't get about how Kafka and Flink are
> supposed to work together. The following questions may be rather general
> and that's because I've no experience with either of them, so any
> additional insight or advice you can provide is very welcome.
>

Both Flink and Kafka are easy to set up. I would recommend to just dive in
and try it out.


>
> *1. Architecture*
>
> Keeping all records for a given product on the same node as mentioned
> should lower the latency since all consumers will handle only local data.
> That works in theory only, because if I'm not wrong Flink executes the
> operators over its own cluster of nodes.
>
> How does Flink know where to process what? What happens if there are more
> nodes running Flink than Kafka, or the other way around? In case I have one
> pair of Kafka and Flink node on each node on the cluster (AWS), how am I
> sure that nodes get the correct data (ie, only local data)?
>

Flink does currently not account for locality when reading from Kafka.
What latencies are you aiming for? I don't expect the Kafka connector to
add a lot of latency to your pipeline.

This blog post goes into the details of how data consumers are assigned to
partitions: http://data-artisans.com/kafka-flink-a-practical-how-to/


>
> *2. Performance*
>
> In the case of computing the volume of a product in a sliding window and
> provided the quantities are 10, 3, 6, 1, 0, 15, .... For the first window I
> may need to compute 10+3+6+1, for the second 3+6+1+0, for the third
> 6+1+0+15, and so on. The problem here is that in each case I compute some
> of the sums multiple times (6+1 for instance is computed 3 times), and it
> is even worse considering a window may have thousands of records and that
> some operations are not that simple (standard deviation for instance).
>
> Is there a way to reuse the result of previous operations to avoid this
> problem? Any performance improvement to apply on this cases?
>

I think the default operators are not doing any of these optimizations. But
I think you could implement them with our custom windowing API.
Check out this blog post:
https://flink.apache.org/news/2015/12/04/Introducing-windows.html



>
> *3. Topology*
>
> The pipeline of operations to get all the information for a report is
> really big. It may require an average of prices, volume, standard
> deviation, etc. But some of these operations can be executed concurrently.
>
> How do I define the workflow (topology) so that certain "steps" are
> executed concurrently, while others wait for all the previous steps to be
> completed before proceeding?
>

I'm skipping this one. I would recommend to play around with our windowing
API. Then, it should be obvious how we parallelize, how you emit results,
how you can compute multiple reports / numbers at once.


>
> *4. Launch*
>
> The producer and all the consumers have in common many Java classes, so
> for simplicity I intend to create and launch them from a single
> application/process, maybe creating one thread for each one if required.
>
> Is there any problem with that? Any advantage of creating an independent
> application for each producer and consumer as shown in the documentation?
>

Check out this documentation page about the execution model:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/index.html
Usually, you don't have to worry about threading etc.