You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yunus Olgun <yu...@gmail.com> on 2017/08/04 13:23:55 UTC

Synchronized Kafka sources

Hi,

Is it possible to synchronize two kafka sources? So they can consume from
different Kafka topics in close enough event times.

My use case is, I have two Kafka topics: A(very large) and B(large). There
is a mapping of one to one or zero between A and B. Topology is simply join
A and B in a tumbling time window and do aggregations on the joined data.

In real time, there is not a problem. But when I start the job for last
week it becomes very slow. Because, by the time source A consumes 1 minute
of data from Kafka, source B consumes 1 hour of data from Kafka. Since
watermark progresses with the smallest of the parent operators, source B
generates many windows that will stay in the memory to be triggered in the
future. That increases state size. Checkpoints gets bigger and bigger and
the job becomes slower.

I have tried to put an operator after sources which writes event times to
an external source. If a source is far ahead than the other one, it sleeps
for a short time then consume a little bit, then check and sleep again if
it is necessary. This map operator increased checkpoint times much higher.
I guess sleeping at an operator is not a good idea with checkpoint
mechanism.

Is there a way to make two or more sources consume in a synchonized way
from Kafka using Flink?

Re: Synchronized Kafka sources

Posted by 魏偉哲 <to...@gmail.com>.
Hi Yunus,

I'm not sure if there is a way to synchronize two Kafka sources in Flink,
but I have another opinion on this question.

How about adjust number of shards and parallelism of consumers on A and B?
For example, making A have higher parallelism and B have lower parallelism
so that you can make A operator to process faster.

In this way, you might need to know what is the better ratio to balance
these two consumers, because rescale operators is manual currently.
However, the automatic rescale will be added in the future and you can use
some mechanisms to automatically rescale the operators like you did to make
the operator sleep.
https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html#rescaling-stateful-stream-processing-jobs

Hope this will help you.

Regards,
Tony Wei


2017-08-04 21:23 GMT+08:00 Yunus Olgun <yu...@gmail.com>:

> Hi,
>
> Is it possible to synchronize two kafka sources? So they can consume from
> different Kafka topics in close enough event times.
>
> My use case is, I have two Kafka topics: A(very large) and B(large). There
> is a mapping of one to one or zero between A and B. Topology is simply join
> A and B in a tumbling time window and do aggregations on the joined data.
>
> In real time, there is not a problem. But when I start the job for last
> week it becomes very slow. Because, by the time source A consumes 1 minute
> of data from Kafka, source B consumes 1 hour of data from Kafka. Since
> watermark progresses with the smallest of the parent operators, source B
> generates many windows that will stay in the memory to be triggered in the
> future. That increases state size. Checkpoints gets bigger and bigger and
> the job becomes slower.
>
> I have tried to put an operator after sources which writes event times to
> an external source. If a source is far ahead than the other one, it sleeps
> for a short time then consume a little bit, then check and sleep again if
> it is necessary. This map operator increased checkpoint times much higher.
> I guess sleeping at an operator is not a good idea with checkpoint
> mechanism.
>
> Is there a way to make two or more sources consume in a synchonized way
> from Kafka using Flink?
>