You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Oleksandr Baliev <al...@gmail.com> on 2017/08/29 08:54:32 UTC

Kafka consumer are too fast for some partitions in "flatMap" like jobs

Hello,

There is one Flink job which consumes from Kafka topic (TOPIC_IN), simply
flatMap / map data and push to another Kafka topic (TOPIC_OUT).
TOPIC_IN has around 30 partitions, data is more or less sequential per
partition and the job has parallelism 30. So in theory there should be 1:1
mapping between consumer and partition.

But it's often to see big lag in offsets for some partitions. So that
should mean that some of consumers are slower than another (i.e. some
network issues for particular broker host or anything else). So data in
TOPIC_OUT partitions is distributed but not sequential at all.

So when some another flink job consumes from TOPIC_OUT and uses
BoundedOutOfOrdernessTimestampExtractor to generate watermarks, due to
difference in data timestamps, there can be a lot of late data. Maybe
something is missing of course in this setup or there is more good approach
for such flatMap / map jobs.

Setting big WindowedStream#allowedLateness or giving more time for
BoundedOutOfOrdernessTimestampExtractor will increase memory consumption
and probably will cause another issues and anyway there can be late data
which is not good for later windows.

One of the solution is to have some shared place, to synchronize lower
timestamp between consumers and somehow slow down consumption (Thread
sleep, wait, while loop with condition...).

0. Is there any good approach to handle such "Kafka <-  flatMap / map ->
Kafka" tasks? so data in TOPIC_OUT will be sequential as in TOPIC_IN.

1. As far as I see it should be common problem with some slow consumers for
big Kafka topic with a lot of partitions, isn't it? How Flink/Kafka hadle
it?

2. Does somebody know, is there any mechanism in Flink - Kafka,
(backpreassure?), which can tell from child operator (some process function
for example) to specific fast consumers to slow down a bit? Is something
like callback possible in Flink, don't think so, but..?

3. Or is there in Flink already anything which can help to synchronize
minimum timestamps between consumers and?

4. Is there any good approach to slow down consumption in Kafka consumer?
There should be some problems between session timeout and poll I think or
something related to that, but maybe there is already some good solution
for that :)

Will be glad if somebody can give some hints for any of the questions,

Best,
Sasha

Re: Kafka consumer are too fast for some partitions in "flatMap" like jobs

Posted by Elias Levy <fe...@gmail.com>.
On Wed, Aug 30, 2017 at 11:50 AM, Oleksandr Baliev <
aleksanderbalev@gmail.com> wrote:

>
> So the main question is how to synchronize data reading between kafka
> partitions when data is sequential per partitions, but late for some of
> them and we care about that data is not thrown away and will be fully
> processed for some time range (window) later? :) It's more about manually
> handling consumption on Kafka Fetch level and FlinkKafka* is high level for
> that, isn't it?
>


At some point you have to give up on late data and drop it if you are
performing some window computation.  That said, that could be a long time,
allowing for very out of order data. Presumably most data won't be late,
and you want to output preliminary results to have timely data.  In that
case you want to implement a window trigger that fires early at regular
intervals without purging if it has received new events since the last time
it fired and purges the data once the allowed lateness time passes.

For instance, see this EventTimeTriggerWithEarlyAndLateFiring
<https://gist.github.com/eliaslevy/ec840444607b9a5dd5aa3eb2cdd77932> in
Java or this simplified EarlyFiringEventTimeTrigger
<https://gist.github.com/eliaslevy/43eca44e92fdef44e6717c60ea46e4d2> in
Scala.

Re: Kafka consumer are too fast for some partitions in "flatMap" like jobs

Posted by Oleksandr Baliev <al...@gmail.com>.
Hi Elias,

Thanks for reply, TOPIC_OUT has less partitions, ~20, but actually there
are 4 output topics with different amount of partitions. So the Job is kind
of router.

In general to have 1:1 partitions for IN and OUT topics is good, thanks for
tip. But since the main goal is to have windows in next Job which consumes
data from OUT_TOPIC, data in partitions will be sequential by partitions,
but not well distributed (some partitions will have more late data then
others). So when window will be applied and some data from one of 30th (if
setup TOPIC_OUT with 30 partitions) partitions will come with timestamp
(watermark) which is out if range of given time for watermarkExtractor,
window will be closed, and data will be counted as late.

If take next example, 2 partitions (not sure if indentation will be okay in
your mail, so sorry in advance):
PARTITION - TOPIC_IN data (timestamps) - TOPIC_OUT after maping (timestamps)
1                  - t5, t4, t3, t2, t1                        - t5, t4,
t3, t2, t1
2                  - t5, t4, t3, t2, t1                        - t3, t2, t1
I.e. there were some lag for consumer/producer for partition 2 so in
TOPIC_OUT data is coming later.
It means that when another job will read data from TOPIC_OUT, it will read
p1t1 (entry with t1 timestamp and partition 1), appropriate window will be
created i.e. w"t0-t1" (entries with timestamp from t0 till t1 timestamps
will be taken into account). Then p1t2 entry come and window w"t0-t1" will
be closed and w"t1-t2" window will be created and so on till object p2t1
will come and it will be counted as late data. I think that should be very
common situation for such tasks. I don't take into account
BoundedOutOfOrdernessTimestampExtractor or allowLateness parameters,
because they can just tune this things, but late data anyway will come.

Hm... probably I'm just thinking about the problem as not isolated, so as
"TOPIC_IN -> map job -> TOPIC_OUT -> another job with window". But I think
"TOPIC_OUT -> another job with window" should be separated from this
pipeline. And reviewed like a job with late data which is common question
for parallel data sources with just some slow parts.

So the main question is how to synchronize data reading between kafka
partitions when data is sequential per partitions, but late for some of
them and we care about that data is not thrown away and will be fully
processed for some time range (window) later? :) It's more about manually
handling consumption on Kafka Fetch level and FlinkKafka* is high level for
that, isn't it?

Also in Flink, watermark is only one thing which can be somehow applied as
i understand to that task, as it's a global metric, but it's using only for
window mechanism. Which is more about window assigner (how/when to create
window) and triggers (how/when to close window) and it's cannot say to
consumer, please wait a bit, because I know that some data is still there,
so let's check all sources first and only then I'll be closed, or maybe
not.. somehow in assigner somehow do not create window, so just wait till
some condition, but again some problems with stopping consumer. Eh..

Best,
Sasha

2017-08-29 18:10 GMT+02:00 Elias Levy <fe...@gmail.com>:

> How many partitions does the output topic have?  If it has the same number
> of partitions as the input topic (30), have you considered simply using a
> custom partitioner for the Kafka sink that uses the input partition number
> as the output partition number?  If the input messages are ordered per
> input partition, that would guarantee their order in the output partitions.
>
> On Tue, Aug 29, 2017 at 1:54 AM, Oleksandr Baliev <
> aleksanderbalev@gmail.com> wrote:
>
>> Hello,
>>
>> There is one Flink job which consumes from Kafka topic (TOPIC_IN), simply
>> flatMap / map data and push to another Kafka topic (TOPIC_OUT).
>> TOPIC_IN has around 30 partitions, data is more or less sequential per
>> partition and the job has parallelism 30. So in theory there should be 1:1
>> mapping between consumer and partition.
>>
>> But it's often to see big lag in offsets for some partitions. So that
>> should mean that some of consumers are slower than another (i.e. some
>> network issues for particular broker host or anything else). So data in
>> TOPIC_OUT partitions is distributed but not sequential at all.
>>
>> So when some another flink job consumes from TOPIC_OUT and uses
>> BoundedOutOfOrdernessTimestampExtractor to generate watermarks, due to
>> difference in data timestamps, there can be a lot of late data. Maybe
>> something is missing of course in this setup or there is more good approach
>> for such flatMap / map jobs.
>>
>> Setting big WindowedStream#allowedLateness or giving more time for
>> BoundedOutOfOrdernessTimestampExtractor will increase memory consumption
>> and probably will cause another issues and anyway there can be late data
>> which is not good for later windows.
>>
>> One of the solution is to have some shared place, to synchronize lower
>> timestamp between consumers and somehow slow down consumption (Thread
>> sleep, wait, while loop with condition...).
>>
>> 0. Is there any good approach to handle such "Kafka <-  flatMap / map ->
>> Kafka" tasks? so data in TOPIC_OUT will be sequential as in TOPIC_IN.
>>
>> 1. As far as I see it should be common problem with some slow consumers
>> for big Kafka topic with a lot of partitions, isn't it? How Flink/Kafka
>> hadle it?
>>
>> 2. Does somebody know, is there any mechanism in Flink - Kafka,
>> (backpreassure?), which can tell from child operator (some process function
>> for example) to specific fast consumers to slow down a bit? Is something
>> like callback possible in Flink, don't think so, but..?
>>
>> 3. Or is there in Flink already anything which can help to synchronize
>> minimum timestamps between consumers and?
>>
>> 4. Is there any good approach to slow down consumption in Kafka consumer?
>> There should be some problems between session timeout and poll I think or
>> something related to that, but maybe there is already some good solution
>> for that :)
>>
>> Will be glad if somebody can give some hints for any of the questions,
>>
>> Best,
>> Sasha
>>
>
>

Re: Kafka consumer are too fast for some partitions in "flatMap" like jobs

Posted by Elias Levy <fe...@gmail.com>.
How many partitions does the output topic have?  If it has the same number
of partitions as the input topic (30), have you considered simply using a
custom partitioner for the Kafka sink that uses the input partition number
as the output partition number?  If the input messages are ordered per
input partition, that would guarantee their order in the output partitions.

On Tue, Aug 29, 2017 at 1:54 AM, Oleksandr Baliev <aleksanderbalev@gmail.com
> wrote:

> Hello,
>
> There is one Flink job which consumes from Kafka topic (TOPIC_IN), simply
> flatMap / map data and push to another Kafka topic (TOPIC_OUT).
> TOPIC_IN has around 30 partitions, data is more or less sequential per
> partition and the job has parallelism 30. So in theory there should be 1:1
> mapping between consumer and partition.
>
> But it's often to see big lag in offsets for some partitions. So that
> should mean that some of consumers are slower than another (i.e. some
> network issues for particular broker host or anything else). So data in
> TOPIC_OUT partitions is distributed but not sequential at all.
>
> So when some another flink job consumes from TOPIC_OUT and uses
> BoundedOutOfOrdernessTimestampExtractor to generate watermarks, due to
> difference in data timestamps, there can be a lot of late data. Maybe
> something is missing of course in this setup or there is more good approach
> for such flatMap / map jobs.
>
> Setting big WindowedStream#allowedLateness or giving more time for
> BoundedOutOfOrdernessTimestampExtractor will increase memory consumption
> and probably will cause another issues and anyway there can be late data
> which is not good for later windows.
>
> One of the solution is to have some shared place, to synchronize lower
> timestamp between consumers and somehow slow down consumption (Thread
> sleep, wait, while loop with condition...).
>
> 0. Is there any good approach to handle such "Kafka <-  flatMap / map ->
> Kafka" tasks? so data in TOPIC_OUT will be sequential as in TOPIC_IN.
>
> 1. As far as I see it should be common problem with some slow consumers
> for big Kafka topic with a lot of partitions, isn't it? How Flink/Kafka
> hadle it?
>
> 2. Does somebody know, is there any mechanism in Flink - Kafka,
> (backpreassure?), which can tell from child operator (some process function
> for example) to specific fast consumers to slow down a bit? Is something
> like callback possible in Flink, don't think so, but..?
>
> 3. Or is there in Flink already anything which can help to synchronize
> minimum timestamps between consumers and?
>
> 4. Is there any good approach to slow down consumption in Kafka consumer?
> There should be some problems between session timeout and poll I think or
> something related to that, but maybe there is already some good solution
> for that :)
>
> Will be glad if somebody can give some hints for any of the questions,
>
> Best,
> Sasha
>