You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Niklas Lönn <ni...@gmail.com> on 2019/01/22 12:38:08 UTC

Open files clogging and KafkaStreams

Hi Kafka Devs & Users,

We recently had an issue where we processed a lot of old data and we
crashed our brokers due to too many memory mapped files.
It seems to me that the nature of Kafka / Kafka Streams is a bit suboptimal
in terms of resource management. (Keeping all files open all the time,
maybe there should be something managing this more on-demand?)

In the issue I described, the repartition topic was produced very fast, but
not consumed, causing a lot of segments and files to be open at the same
time.

I have worked around the issue by making sure I have more threads than
partitions to force tasks to subscribe to internal topics only, but seems a
bit hacky and maybe there should be some guidance in documentation if
considered part of design..

After quite some testing and code reversing it seems that the nature of
this imbalance lies within how the broker multiplexes the consumed
topic-partitions.

I have attached a slide that I will present to my team to explain the issue
in a bit more detail, it might be good to check it out to understand the
context.

Any thoughts about my findings and concerns?

Kind regards
Niklas

Re: Open files clogging and KafkaStreams

Posted by Guozhang Wang <wa...@gmail.com>.
Not a problem. Glad that you've not seen it anymore now.

If it occurs again please feel free to reach out to the community again.


Guozhang

On Thu, Jan 24, 2019 at 2:32 PM Niklas Lönn <ni...@gmail.com> wrote:

> Hi.
>
> I have something good (and personally mysterious) to report.
>
> We do indeed run 1.1.x in production.
>
>  And today when I was almost finished cleaning up my test case for public
> display, I had been forced by corp policies to update osx, and suddenly
> when I had my test in a "non hacky improvised piece of iteration test code
> not asserting stuff" mode, I couldn't recreate the issue any more, not with
> the new or the old code.
>
> I suspect I was unlucky to hit some other issue in my os/firmware having
> very similar symptoms as we had in production, ran my test on another
> computer without this update and it was fine there as well.
>
> I guess that concludes that you are most likely very right with this 1.1
> bug and I was super unlucky to be able to recreate it locally due to other
> issues.
>
> Thanks for the support and rubber ducking :)
>
> Kind regards
> Niklas
>
> On Thu 24. Jan 2019 at 02:08, Guozhang Wang <wa...@gmail.com> wrote:
>
> > I see (btw attachments are usually not allowed in AK mailing list, but if
> > you have it somewhere like gitcode and can share the url that works).
> >
> > Could you let me know how many physical cores do you have in total
> hosting
> > your app and how many threads did you configure? From your current
> > description there should have at least 40 tasks (20 reading from source
> > topics and writing to repartition topics, and 20 reading from repartition
> > topics), and I'd like to know how are these tasks be assigned to threads,
> > and how many threads may be executed in parallel from the hardware.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jan 23, 2019 at 1:21 PM Niklas Lönn <ni...@gmail.com>
> wrote:
> >
> > > I have to double check what version of broker we run in production but
> > when
> > > testing and verifying the issue locally I did reproduce it with both
> > broker
> > > and client version 2.1.0
> > >
> > > Kind regards
> > > Niklas
> > >
> > > On Wed 23. Jan 2019 at 18:24, Guozhang Wang <wa...@gmail.com>
> wrote:
> > >
> > > > I see.
> > > >
> > > > What you described is a known issue in the older version of Kafka,
> that
> > > > some high traffic topics in the bootstrap mode may effectively
> "starve"
> > > > other topics in the fetch response, since brokers used to naively
> fill
> > in
> > > > the bytes that meets the max.bytes configuration and returns. This is
> > > fixed
> > > > in 1.1 version via incremental fetch request:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
> > > >
> > > > The basic idea is to not always request topics like A,B,C; instead if
> > the
> > > > previous request asks for topics A,B,C and got all data from A, then
> > next
> > > > request would be B,C,A, etc. So if you are on older versions of Kafka
> > I'd
> > > > suggest you upgrade to newer version.
> > > >
> > > > If you cannot upgrade atm, another suggest as I mentioned above is to
> > > > change the segment sizes so you can have much larger, and hence fewer
> > > > segment files.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn <ni...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > I think I went a bit ahead of myself in describing the situation, I
> > had
> > > > an
> > > > > attachment with the context in detail, maybe it was filtered out.
> > Lets
> > > > try
> > > > > again =)
> > > > >
> > > > > We have a topology looking something like this:
> > > > >
> > > > > input-topic[20 partitions, compacted]
> > > > >     |
> > > > > use-case-repartition[20 partitions, infinite retention, segment.ms
> > > > =10min]
> > > > >     |
> > > > > use-case-changelog
> > > > >
> > > > > We have previously hit the TooManyOpenFiles issue and "solved" it
> by
> > > > > raising the bar to something extreme.
> > > > > Later we found out that we wanted rep factor 3 on all internal
> > topics,
> > > so
> > > > > we reset the app and BOOM, now we hit a too many memory mapped
> files
> > > > limit
> > > > > instead
> > > > >
> > > > > the input topic contains 30 days of data, where we pretty much have
> > > > records
> > > > > in every 10minute window for every partition.
> > > > > This means if nothing consumes the repartition topic we will have 6
> > (10
> > > > min
> > > > > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log
> > .timeindex
> > > > > files) * 3 replication factor / 5 brokers in cluster = *155.520
> *open
> > > > files
> > > > > just to have this repartition topic in place.
> > > > >
> > > > > You would say, yeah but no problem as it would be deleted and you
> > would
> > > > not
> > > > > reach such high numbers? But doesn't seem to be the case.
> > > > > What happened in our case is that, due to how the broker
> multiplexes
> > > the
> > > > > topic partitions for the subscribers, the streams application piled
> > up
> > > > all
> > > > > the repartition records, and only when caught up, all the
> downstream
> > > > > processes started taking place. I do see this as a design flaw in
> > some
> > > > > component, probably the broker. It cant be the desired behaviour.
> How
> > > > many
> > > > > open files do I need to be able to have open in a year of data when
> > > > > resetting/reprocessing an application?
> > > > >
> > > > > By adding more threads than input topic partitions, I managed to
> > force
> > > > the
> > > > > broker to give out these records earlier and issue was mitigated.
> > > > >
> > > > > Ideally the downstream records should be processed somewhere near
> in
> > > time
> > > > > as the source record.
> > > > >
> > > > > Lets take one partition, containing 1.000.000 records this is the
> > > > observed
> > > > > behaviour I have seen: (Somewhat simplified)
> > > > >
> > > > > Time     Consumer offset Input topic     Records in input topic
> > > > >  Consumer offset repartition topic     Records in repartition topic
> > > > > 00:00    0                                               1.000.000
> > > > >              0
> >   0
> > > > > 00:01    100.000                                    1.000.000
> > > > >          0
> > > 100.000
> > > > > 00:02    200.000                                    1.000.000
> > > > >          0
> > > 200.000
> > > > > 00:03    300.000                                    1.000.000
> > > > >          0
> > > 300.000
> > > > > 00:04    400.000                                    1.000.000
> > > > >          0
> > > 400.000
> > > > > 00:05    500.000                                    1.000.000
> > > > >          0
> > > 500.000
> > > > > 00:06    600.000                                    1.000.000
> > > > >          0
> > > 600.000
> > > > > 00:07    700.000                                    1.000.000
> > > > >          0
> > > 700.000
> > > > > 00:08    800.000                                    1.000.000
> > > > >          0
> > > 800.000
> > > > > 00:09    900.000                                    1.000.000
> > > > >          0
> > > 900.000
> > > > > 00:10    1.000.000                                 1.000.000
> > > > >        0
> > > 1000.000
> > > > > 00:11    1.000.000                                 1.000.000
> > > > >        100.000                                             1000.000
> > > > > 00:12    1.000.000                                 1.000.000
> > > > >        200.000                                             1000.000
> > > > > 00:13    1.000.000                                 1.000.000
> > > > >        300.000                                             1000.000
> > > > > 00:14    1.000.000                                 1.000.000
> > > > >        400.000                                             1000.000
> > > > > 00:15    1.000.000                                 1.000.000
> > > > >        500.000                                             1000.000
> > > > > 00:16    1.000.000                                 1.000.000
> > > > >        600.000                                             1000.000
> > > > > 00:17    1.000.000                                 1.000.000
> > > > >        700.000                                             1000.000
> > > > > 00:18    1.000.000                                 1.000.000
> > > > >        800.000                                             1000.000
> > > > > 00:19    1.000.000                                 1.000.000
> > > > >        900.000                                             1000.000
> > > > > 00:20    1.000.000                                 1.000.000
> > > > >        1.000.000                                          1000.000
> > > > >
> > > > > As you can see, there is no parallel execution and its due to that
> > the
> > > > > broker does not give any records from repartition topic until input
> > > topic
> > > > > is depleted.
> > > > > By adding more threads than input partitions I managed to mitigate
> > this
> > > > > behaviour somewhat, but still not close to balanced.
> > > > >
> > > > > Ideally in such a situation where we rebuild stream states, I would
> > > more
> > > > > expect a behaviour like this:
> > > > >
> > > > > Time     Consumer offset Input topic     Records in input topic
> > > > >  Consumer offset repartition topic     Records in repartition topic
> > > > > 00:00    0                                               1.000.000
> > > > >              0
> >   0
> > > > > 00:01    100.000                                    1.000.000
> > > > >          0
> > > 100.000
> > > > > 00:02    200.000                                    1.000.000
> > > > >          100.000
>  200.000
> > > > > 00:03    300.000                                    1.000.000
> > > > >          200.000
>  300.000
> > > > > 00:04    400.000                                    1.000.000
> > > > >          300.000
>  400.000
> > > > > 00:05    500.000                                    1.000.000
> > > > >          400.000
>  500.000
> > > > > 00:06    600.000                                    1.000.000
> > > > >          500.000
>  600.000
> > > > > 00:07    700.000                                    1.000.000
> > > > >          600.000
>  700.000
> > > > > 00:08    800.000                                    1.000.000
> > > > >          700.000
>  800.000
> > > > > 00:09    900.000                                    1.000.000
> > > > >          800.000
>  900.000
> > > > > 00:10    1.000.000                                 1.000.000
> > > > >        900.000                                             1000.000
> > > > > 00:10    1.000.000                                 1.000.000
> > > > >        1.000.000                                          1000.000
> > > > >
> > > > >
> > > > > Kind regards
> > > > > Niklas
> > > > >
> > > > > On Tue, Jan 22, 2019 at 6:48 PM Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hello Niklas,
> > > > > >
> > > > > > If you can monitor your repartition topic's consumer lag, and it
> > was
> > > > > > increasing consistently, it means your downstream processor
> cannot
> > > > simply
> > > > > > keep up with the throughput of the upstream processor. Usually it
> > > means
> > > > > > your downstream operators is heavier (e.g. aggregations, joins
> that
> > > are
> > > > > all
> > > > > > stateful) than your upstreams (e.g. simply for shuffling the data
> > to
> > > > > > repartition topics), and since tasks assignment only consider a
> > task
> > > as
> > > > > the
> > > > > > smallest unit of work and did not differentiate "heavy" and
> "light"
> > > > > tasks,
> > > > > > such imbalance of task assignment may happen. At the moment, to
> > > resolve
> > > > > > this you should add more resources to make sure the heavy tasks
> get
> > > > > enough
> > > > > > computational resource assigned (more threads, e.g.).
> > > > > >
> > > > > > If your observed consumer lag stays plateau after increasing to
> > some
> > > > > point,
> > > > > > it means your consumer can actually keep up with some constant
> lag;
> > > if
> > > > > you
> > > > > > hit your open file limits before seeing this, it means you either
> > > need
> > > > to
> > > > > > increase your open file limits, OR you can simply increase the
> > > segment
> > > > > size
> > > > > > to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the
> > value
> > > > of
> > > > > > TopicConfig.SEGMENT_BYTES_CONFIG.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn <
> niklas.lonn@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Kafka Devs & Users,
> > > > > > >
> > > > > > > We recently had an issue where we processed a lot of old data
> and
> > > we
> > > > > > > crashed our brokers due to too many memory mapped files.
> > > > > > > It seems to me that the nature of Kafka / Kafka Streams is a
> bit
> > > > > > > suboptimal in terms of resource management. (Keeping all files
> > open
> > > > all
> > > > > > the
> > > > > > > time, maybe there should be something managing this more
> > > on-demand?)
> > > > > > >
> > > > > > > In the issue I described, the repartition topic was produced
> very
> > > > fast,
> > > > > > > but not consumed, causing a lot of segments and files to be
> open
> > at
> > > > the
> > > > > > > same time.
> > > > > > >
> > > > > > > I have worked around the issue by making sure I have more
> threads
> > > > than
> > > > > > > partitions to force tasks to subscribe to internal topics only,
> > but
> > > > > > seems a
> > > > > > > bit hacky and maybe there should be some guidance in
> > documentation
> > > if
> > > > > > > considered part of design..
> > > > > > >
> > > > > > > After quite some testing and code reversing it seems that the
> > > nature
> > > > of
> > > > > > > this imbalance lies within how the broker multiplexes the
> > consumed
> > > > > > > topic-partitions.
> > > > > > >
> > > > > > > I have attached a slide that I will present to my team to
> explain
> > > the
> > > > > > > issue in a bit more detail, it might be good to check it out to
> > > > > > understand
> > > > > > > the context.
> > > > > > >
> > > > > > > Any thoughts about my findings and concerns?
> > > > > > >
> > > > > > > Kind regards
> > > > > > > Niklas
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: Open files clogging and KafkaStreams

Posted by Niklas Lönn <ni...@gmail.com>.
Hi.

I have something good (and personally mysterious) to report.

We do indeed run 1.1.x in production.

 And today when I was almost finished cleaning up my test case for public
display, I had been forced by corp policies to update osx, and suddenly
when I had my test in a "non hacky improvised piece of iteration test code
not asserting stuff" mode, I couldn't recreate the issue any more, not with
the new or the old code.

I suspect I was unlucky to hit some other issue in my os/firmware having
very similar symptoms as we had in production, ran my test on another
computer without this update and it was fine there as well.

I guess that concludes that you are most likely very right with this 1.1
bug and I was super unlucky to be able to recreate it locally due to other
issues.

Thanks for the support and rubber ducking :)

Kind regards
Niklas

On Thu 24. Jan 2019 at 02:08, Guozhang Wang <wa...@gmail.com> wrote:

> I see (btw attachments are usually not allowed in AK mailing list, but if
> you have it somewhere like gitcode and can share the url that works).
>
> Could you let me know how many physical cores do you have in total hosting
> your app and how many threads did you configure? From your current
> description there should have at least 40 tasks (20 reading from source
> topics and writing to repartition topics, and 20 reading from repartition
> topics), and I'd like to know how are these tasks be assigned to threads,
> and how many threads may be executed in parallel from the hardware.
>
>
> Guozhang
>
>
> On Wed, Jan 23, 2019 at 1:21 PM Niklas Lönn <ni...@gmail.com> wrote:
>
> > I have to double check what version of broker we run in production but
> when
> > testing and verifying the issue locally I did reproduce it with both
> broker
> > and client version 2.1.0
> >
> > Kind regards
> > Niklas
> >
> > On Wed 23. Jan 2019 at 18:24, Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > I see.
> > >
> > > What you described is a known issue in the older version of Kafka, that
> > > some high traffic topics in the bootstrap mode may effectively "starve"
> > > other topics in the fetch response, since brokers used to naively fill
> in
> > > the bytes that meets the max.bytes configuration and returns. This is
> > fixed
> > > in 1.1 version via incremental fetch request:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
> > >
> > > The basic idea is to not always request topics like A,B,C; instead if
> the
> > > previous request asks for topics A,B,C and got all data from A, then
> next
> > > request would be B,C,A, etc. So if you are on older versions of Kafka
> I'd
> > > suggest you upgrade to newer version.
> > >
> > > If you cannot upgrade atm, another suggest as I mentioned above is to
> > > change the segment sizes so you can have much larger, and hence fewer
> > > segment files.
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn <ni...@gmail.com>
> > wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > I think I went a bit ahead of myself in describing the situation, I
> had
> > > an
> > > > attachment with the context in detail, maybe it was filtered out.
> Lets
> > > try
> > > > again =)
> > > >
> > > > We have a topology looking something like this:
> > > >
> > > > input-topic[20 partitions, compacted]
> > > >     |
> > > > use-case-repartition[20 partitions, infinite retention, segment.ms
> > > =10min]
> > > >     |
> > > > use-case-changelog
> > > >
> > > > We have previously hit the TooManyOpenFiles issue and "solved" it by
> > > > raising the bar to something extreme.
> > > > Later we found out that we wanted rep factor 3 on all internal
> topics,
> > so
> > > > we reset the app and BOOM, now we hit a too many memory mapped files
> > > limit
> > > > instead
> > > >
> > > > the input topic contains 30 days of data, where we pretty much have
> > > records
> > > > in every 10minute window for every partition.
> > > > This means if nothing consumes the repartition topic we will have 6
> (10
> > > min
> > > > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log
> .timeindex
> > > > files) * 3 replication factor / 5 brokers in cluster = *155.520 *open
> > > files
> > > > just to have this repartition topic in place.
> > > >
> > > > You would say, yeah but no problem as it would be deleted and you
> would
> > > not
> > > > reach such high numbers? But doesn't seem to be the case.
> > > > What happened in our case is that, due to how the broker multiplexes
> > the
> > > > topic partitions for the subscribers, the streams application piled
> up
> > > all
> > > > the repartition records, and only when caught up, all the downstream
> > > > processes started taking place. I do see this as a design flaw in
> some
> > > > component, probably the broker. It cant be the desired behaviour. How
> > > many
> > > > open files do I need to be able to have open in a year of data when
> > > > resetting/reprocessing an application?
> > > >
> > > > By adding more threads than input topic partitions, I managed to
> force
> > > the
> > > > broker to give out these records earlier and issue was mitigated.
> > > >
> > > > Ideally the downstream records should be processed somewhere near in
> > time
> > > > as the source record.
> > > >
> > > > Lets take one partition, containing 1.000.000 records this is the
> > > observed
> > > > behaviour I have seen: (Somewhat simplified)
> > > >
> > > > Time     Consumer offset Input topic     Records in input topic
> > > >  Consumer offset repartition topic     Records in repartition topic
> > > > 00:00    0                                               1.000.000
> > > >              0
>   0
> > > > 00:01    100.000                                    1.000.000
> > > >          0
> > 100.000
> > > > 00:02    200.000                                    1.000.000
> > > >          0
> > 200.000
> > > > 00:03    300.000                                    1.000.000
> > > >          0
> > 300.000
> > > > 00:04    400.000                                    1.000.000
> > > >          0
> > 400.000
> > > > 00:05    500.000                                    1.000.000
> > > >          0
> > 500.000
> > > > 00:06    600.000                                    1.000.000
> > > >          0
> > 600.000
> > > > 00:07    700.000                                    1.000.000
> > > >          0
> > 700.000
> > > > 00:08    800.000                                    1.000.000
> > > >          0
> > 800.000
> > > > 00:09    900.000                                    1.000.000
> > > >          0
> > 900.000
> > > > 00:10    1.000.000                                 1.000.000
> > > >        0
> > 1000.000
> > > > 00:11    1.000.000                                 1.000.000
> > > >        100.000                                             1000.000
> > > > 00:12    1.000.000                                 1.000.000
> > > >        200.000                                             1000.000
> > > > 00:13    1.000.000                                 1.000.000
> > > >        300.000                                             1000.000
> > > > 00:14    1.000.000                                 1.000.000
> > > >        400.000                                             1000.000
> > > > 00:15    1.000.000                                 1.000.000
> > > >        500.000                                             1000.000
> > > > 00:16    1.000.000                                 1.000.000
> > > >        600.000                                             1000.000
> > > > 00:17    1.000.000                                 1.000.000
> > > >        700.000                                             1000.000
> > > > 00:18    1.000.000                                 1.000.000
> > > >        800.000                                             1000.000
> > > > 00:19    1.000.000                                 1.000.000
> > > >        900.000                                             1000.000
> > > > 00:20    1.000.000                                 1.000.000
> > > >        1.000.000                                          1000.000
> > > >
> > > > As you can see, there is no parallel execution and its due to that
> the
> > > > broker does not give any records from repartition topic until input
> > topic
> > > > is depleted.
> > > > By adding more threads than input partitions I managed to mitigate
> this
> > > > behaviour somewhat, but still not close to balanced.
> > > >
> > > > Ideally in such a situation where we rebuild stream states, I would
> > more
> > > > expect a behaviour like this:
> > > >
> > > > Time     Consumer offset Input topic     Records in input topic
> > > >  Consumer offset repartition topic     Records in repartition topic
> > > > 00:00    0                                               1.000.000
> > > >              0
>   0
> > > > 00:01    100.000                                    1.000.000
> > > >          0
> > 100.000
> > > > 00:02    200.000                                    1.000.000
> > > >          100.000                                             200.000
> > > > 00:03    300.000                                    1.000.000
> > > >          200.000                                             300.000
> > > > 00:04    400.000                                    1.000.000
> > > >          300.000                                             400.000
> > > > 00:05    500.000                                    1.000.000
> > > >          400.000                                             500.000
> > > > 00:06    600.000                                    1.000.000
> > > >          500.000                                             600.000
> > > > 00:07    700.000                                    1.000.000
> > > >          600.000                                             700.000
> > > > 00:08    800.000                                    1.000.000
> > > >          700.000                                             800.000
> > > > 00:09    900.000                                    1.000.000
> > > >          800.000                                             900.000
> > > > 00:10    1.000.000                                 1.000.000
> > > >        900.000                                             1000.000
> > > > 00:10    1.000.000                                 1.000.000
> > > >        1.000.000                                          1000.000
> > > >
> > > >
> > > > Kind regards
> > > > Niklas
> > > >
> > > > On Tue, Jan 22, 2019 at 6:48 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Niklas,
> > > > >
> > > > > If you can monitor your repartition topic's consumer lag, and it
> was
> > > > > increasing consistently, it means your downstream processor cannot
> > > simply
> > > > > keep up with the throughput of the upstream processor. Usually it
> > means
> > > > > your downstream operators is heavier (e.g. aggregations, joins that
> > are
> > > > all
> > > > > stateful) than your upstreams (e.g. simply for shuffling the data
> to
> > > > > repartition topics), and since tasks assignment only consider a
> task
> > as
> > > > the
> > > > > smallest unit of work and did not differentiate "heavy" and "light"
> > > > tasks,
> > > > > such imbalance of task assignment may happen. At the moment, to
> > resolve
> > > > > this you should add more resources to make sure the heavy tasks get
> > > > enough
> > > > > computational resource assigned (more threads, e.g.).
> > > > >
> > > > > If your observed consumer lag stays plateau after increasing to
> some
> > > > point,
> > > > > it means your consumer can actually keep up with some constant lag;
> > if
> > > > you
> > > > > hit your open file limits before seeing this, it means you either
> > need
> > > to
> > > > > increase your open file limits, OR you can simply increase the
> > segment
> > > > size
> > > > > to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the
> value
> > > of
> > > > > TopicConfig.SEGMENT_BYTES_CONFIG.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn <niklas.lonn@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi Kafka Devs & Users,
> > > > > >
> > > > > > We recently had an issue where we processed a lot of old data and
> > we
> > > > > > crashed our brokers due to too many memory mapped files.
> > > > > > It seems to me that the nature of Kafka / Kafka Streams is a bit
> > > > > > suboptimal in terms of resource management. (Keeping all files
> open
> > > all
> > > > > the
> > > > > > time, maybe there should be something managing this more
> > on-demand?)
> > > > > >
> > > > > > In the issue I described, the repartition topic was produced very
> > > fast,
> > > > > > but not consumed, causing a lot of segments and files to be open
> at
> > > the
> > > > > > same time.
> > > > > >
> > > > > > I have worked around the issue by making sure I have more threads
> > > than
> > > > > > partitions to force tasks to subscribe to internal topics only,
> but
> > > > > seems a
> > > > > > bit hacky and maybe there should be some guidance in
> documentation
> > if
> > > > > > considered part of design..
> > > > > >
> > > > > > After quite some testing and code reversing it seems that the
> > nature
> > > of
> > > > > > this imbalance lies within how the broker multiplexes the
> consumed
> > > > > > topic-partitions.
> > > > > >
> > > > > > I have attached a slide that I will present to my team to explain
> > the
> > > > > > issue in a bit more detail, it might be good to check it out to
> > > > > understand
> > > > > > the context.
> > > > > >
> > > > > > Any thoughts about my findings and concerns?
> > > > > >
> > > > > > Kind regards
> > > > > > Niklas
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Re: Open files clogging and KafkaStreams

Posted by Guozhang Wang <wa...@gmail.com>.
I see (btw attachments are usually not allowed in AK mailing list, but if
you have it somewhere like gitcode and can share the url that works).

Could you let me know how many physical cores do you have in total hosting
your app and how many threads did you configure? From your current
description there should have at least 40 tasks (20 reading from source
topics and writing to repartition topics, and 20 reading from repartition
topics), and I'd like to know how are these tasks be assigned to threads,
and how many threads may be executed in parallel from the hardware.


Guozhang


On Wed, Jan 23, 2019 at 1:21 PM Niklas Lönn <ni...@gmail.com> wrote:

> I have to double check what version of broker we run in production but when
> testing and verifying the issue locally I did reproduce it with both broker
> and client version 2.1.0
>
> Kind regards
> Niklas
>
> On Wed 23. Jan 2019 at 18:24, Guozhang Wang <wa...@gmail.com> wrote:
>
> > I see.
> >
> > What you described is a known issue in the older version of Kafka, that
> > some high traffic topics in the bootstrap mode may effectively "starve"
> > other topics in the fetch response, since brokers used to naively fill in
> > the bytes that meets the max.bytes configuration and returns. This is
> fixed
> > in 1.1 version via incremental fetch request:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
> >
> > The basic idea is to not always request topics like A,B,C; instead if the
> > previous request asks for topics A,B,C and got all data from A, then next
> > request would be B,C,A, etc. So if you are on older versions of Kafka I'd
> > suggest you upgrade to newer version.
> >
> > If you cannot upgrade atm, another suggest as I mentioned above is to
> > change the segment sizes so you can have much larger, and hence fewer
> > segment files.
> >
> > Guozhang
> >
> >
> > On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn <ni...@gmail.com>
> wrote:
> >
> > > Hi Guozhang,
> > >
> > > I think I went a bit ahead of myself in describing the situation, I had
> > an
> > > attachment with the context in detail, maybe it was filtered out. Lets
> > try
> > > again =)
> > >
> > > We have a topology looking something like this:
> > >
> > > input-topic[20 partitions, compacted]
> > >     |
> > > use-case-repartition[20 partitions, infinite retention, segment.ms
> > =10min]
> > >     |
> > > use-case-changelog
> > >
> > > We have previously hit the TooManyOpenFiles issue and "solved" it by
> > > raising the bar to something extreme.
> > > Later we found out that we wanted rep factor 3 on all internal topics,
> so
> > > we reset the app and BOOM, now we hit a too many memory mapped files
> > limit
> > > instead
> > >
> > > the input topic contains 30 days of data, where we pretty much have
> > records
> > > in every 10minute window for every partition.
> > > This means if nothing consumes the repartition topic we will have 6 (10
> > min
> > > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex
> > > files) * 3 replication factor / 5 brokers in cluster = *155.520 *open
> > files
> > > just to have this repartition topic in place.
> > >
> > > You would say, yeah but no problem as it would be deleted and you would
> > not
> > > reach such high numbers? But doesn't seem to be the case.
> > > What happened in our case is that, due to how the broker multiplexes
> the
> > > topic partitions for the subscribers, the streams application piled up
> > all
> > > the repartition records, and only when caught up, all the downstream
> > > processes started taking place. I do see this as a design flaw in some
> > > component, probably the broker. It cant be the desired behaviour. How
> > many
> > > open files do I need to be able to have open in a year of data when
> > > resetting/reprocessing an application?
> > >
> > > By adding more threads than input topic partitions, I managed to force
> > the
> > > broker to give out these records earlier and issue was mitigated.
> > >
> > > Ideally the downstream records should be processed somewhere near in
> time
> > > as the source record.
> > >
> > > Lets take one partition, containing 1.000.000 records this is the
> > observed
> > > behaviour I have seen: (Somewhat simplified)
> > >
> > > Time     Consumer offset Input topic     Records in input topic
> > >  Consumer offset repartition topic     Records in repartition topic
> > > 00:00    0                                               1.000.000
> > >              0                                                        0
> > > 00:01    100.000                                    1.000.000
> > >          0
> 100.000
> > > 00:02    200.000                                    1.000.000
> > >          0
> 200.000
> > > 00:03    300.000                                    1.000.000
> > >          0
> 300.000
> > > 00:04    400.000                                    1.000.000
> > >          0
> 400.000
> > > 00:05    500.000                                    1.000.000
> > >          0
> 500.000
> > > 00:06    600.000                                    1.000.000
> > >          0
> 600.000
> > > 00:07    700.000                                    1.000.000
> > >          0
> 700.000
> > > 00:08    800.000                                    1.000.000
> > >          0
> 800.000
> > > 00:09    900.000                                    1.000.000
> > >          0
> 900.000
> > > 00:10    1.000.000                                 1.000.000
> > >        0
> 1000.000
> > > 00:11    1.000.000                                 1.000.000
> > >        100.000                                             1000.000
> > > 00:12    1.000.000                                 1.000.000
> > >        200.000                                             1000.000
> > > 00:13    1.000.000                                 1.000.000
> > >        300.000                                             1000.000
> > > 00:14    1.000.000                                 1.000.000
> > >        400.000                                             1000.000
> > > 00:15    1.000.000                                 1.000.000
> > >        500.000                                             1000.000
> > > 00:16    1.000.000                                 1.000.000
> > >        600.000                                             1000.000
> > > 00:17    1.000.000                                 1.000.000
> > >        700.000                                             1000.000
> > > 00:18    1.000.000                                 1.000.000
> > >        800.000                                             1000.000
> > > 00:19    1.000.000                                 1.000.000
> > >        900.000                                             1000.000
> > > 00:20    1.000.000                                 1.000.000
> > >        1.000.000                                          1000.000
> > >
> > > As you can see, there is no parallel execution and its due to that the
> > > broker does not give any records from repartition topic until input
> topic
> > > is depleted.
> > > By adding more threads than input partitions I managed to mitigate this
> > > behaviour somewhat, but still not close to balanced.
> > >
> > > Ideally in such a situation where we rebuild stream states, I would
> more
> > > expect a behaviour like this:
> > >
> > > Time     Consumer offset Input topic     Records in input topic
> > >  Consumer offset repartition topic     Records in repartition topic
> > > 00:00    0                                               1.000.000
> > >              0                                                        0
> > > 00:01    100.000                                    1.000.000
> > >          0
> 100.000
> > > 00:02    200.000                                    1.000.000
> > >          100.000                                             200.000
> > > 00:03    300.000                                    1.000.000
> > >          200.000                                             300.000
> > > 00:04    400.000                                    1.000.000
> > >          300.000                                             400.000
> > > 00:05    500.000                                    1.000.000
> > >          400.000                                             500.000
> > > 00:06    600.000                                    1.000.000
> > >          500.000                                             600.000
> > > 00:07    700.000                                    1.000.000
> > >          600.000                                             700.000
> > > 00:08    800.000                                    1.000.000
> > >          700.000                                             800.000
> > > 00:09    900.000                                    1.000.000
> > >          800.000                                             900.000
> > > 00:10    1.000.000                                 1.000.000
> > >        900.000                                             1000.000
> > > 00:10    1.000.000                                 1.000.000
> > >        1.000.000                                          1000.000
> > >
> > >
> > > Kind regards
> > > Niklas
> > >
> > > On Tue, Jan 22, 2019 at 6:48 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hello Niklas,
> > > >
> > > > If you can monitor your repartition topic's consumer lag, and it was
> > > > increasing consistently, it means your downstream processor cannot
> > simply
> > > > keep up with the throughput of the upstream processor. Usually it
> means
> > > > your downstream operators is heavier (e.g. aggregations, joins that
> are
> > > all
> > > > stateful) than your upstreams (e.g. simply for shuffling the data to
> > > > repartition topics), and since tasks assignment only consider a task
> as
> > > the
> > > > smallest unit of work and did not differentiate "heavy" and "light"
> > > tasks,
> > > > such imbalance of task assignment may happen. At the moment, to
> resolve
> > > > this you should add more resources to make sure the heavy tasks get
> > > enough
> > > > computational resource assigned (more threads, e.g.).
> > > >
> > > > If your observed consumer lag stays plateau after increasing to some
> > > point,
> > > > it means your consumer can actually keep up with some constant lag;
> if
> > > you
> > > > hit your open file limits before seeing this, it means you either
> need
> > to
> > > > increase your open file limits, OR you can simply increase the
> segment
> > > size
> > > > to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the value
> > of
> > > > TopicConfig.SEGMENT_BYTES_CONFIG.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn <ni...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Kafka Devs & Users,
> > > > >
> > > > > We recently had an issue where we processed a lot of old data and
> we
> > > > > crashed our brokers due to too many memory mapped files.
> > > > > It seems to me that the nature of Kafka / Kafka Streams is a bit
> > > > > suboptimal in terms of resource management. (Keeping all files open
> > all
> > > > the
> > > > > time, maybe there should be something managing this more
> on-demand?)
> > > > >
> > > > > In the issue I described, the repartition topic was produced very
> > fast,
> > > > > but not consumed, causing a lot of segments and files to be open at
> > the
> > > > > same time.
> > > > >
> > > > > I have worked around the issue by making sure I have more threads
> > than
> > > > > partitions to force tasks to subscribe to internal topics only, but
> > > > seems a
> > > > > bit hacky and maybe there should be some guidance in documentation
> if
> > > > > considered part of design..
> > > > >
> > > > > After quite some testing and code reversing it seems that the
> nature
> > of
> > > > > this imbalance lies within how the broker multiplexes the consumed
> > > > > topic-partitions.
> > > > >
> > > > > I have attached a slide that I will present to my team to explain
> the
> > > > > issue in a bit more detail, it might be good to check it out to
> > > > understand
> > > > > the context.
> > > > >
> > > > > Any thoughts about my findings and concerns?
> > > > >
> > > > > Kind regards
> > > > > Niklas
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: Open files clogging and KafkaStreams

Posted by Niklas Lönn <ni...@gmail.com>.
I have to double check what version of broker we run in production but when
testing and verifying the issue locally I did reproduce it with both broker
and client version 2.1.0

Kind regards
Niklas

On Wed 23. Jan 2019 at 18:24, Guozhang Wang <wa...@gmail.com> wrote:

> I see.
>
> What you described is a known issue in the older version of Kafka, that
> some high traffic topics in the bootstrap mode may effectively "starve"
> other topics in the fetch response, since brokers used to naively fill in
> the bytes that meets the max.bytes configuration and returns. This is fixed
> in 1.1 version via incremental fetch request:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
>
> The basic idea is to not always request topics like A,B,C; instead if the
> previous request asks for topics A,B,C and got all data from A, then next
> request would be B,C,A, etc. So if you are on older versions of Kafka I'd
> suggest you upgrade to newer version.
>
> If you cannot upgrade atm, another suggest as I mentioned above is to
> change the segment sizes so you can have much larger, and hence fewer
> segment files.
>
> Guozhang
>
>
> On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn <ni...@gmail.com> wrote:
>
> > Hi Guozhang,
> >
> > I think I went a bit ahead of myself in describing the situation, I had
> an
> > attachment with the context in detail, maybe it was filtered out. Lets
> try
> > again =)
> >
> > We have a topology looking something like this:
> >
> > input-topic[20 partitions, compacted]
> >     |
> > use-case-repartition[20 partitions, infinite retention, segment.ms
> =10min]
> >     |
> > use-case-changelog
> >
> > We have previously hit the TooManyOpenFiles issue and "solved" it by
> > raising the bar to something extreme.
> > Later we found out that we wanted rep factor 3 on all internal topics, so
> > we reset the app and BOOM, now we hit a too many memory mapped files
> limit
> > instead
> >
> > the input topic contains 30 days of data, where we pretty much have
> records
> > in every 10minute window for every partition.
> > This means if nothing consumes the repartition topic we will have 6 (10
> min
> > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex
> > files) * 3 replication factor / 5 brokers in cluster = *155.520 *open
> files
> > just to have this repartition topic in place.
> >
> > You would say, yeah but no problem as it would be deleted and you would
> not
> > reach such high numbers? But doesn't seem to be the case.
> > What happened in our case is that, due to how the broker multiplexes the
> > topic partitions for the subscribers, the streams application piled up
> all
> > the repartition records, and only when caught up, all the downstream
> > processes started taking place. I do see this as a design flaw in some
> > component, probably the broker. It cant be the desired behaviour. How
> many
> > open files do I need to be able to have open in a year of data when
> > resetting/reprocessing an application?
> >
> > By adding more threads than input topic partitions, I managed to force
> the
> > broker to give out these records earlier and issue was mitigated.
> >
> > Ideally the downstream records should be processed somewhere near in time
> > as the source record.
> >
> > Lets take one partition, containing 1.000.000 records this is the
> observed
> > behaviour I have seen: (Somewhat simplified)
> >
> > Time     Consumer offset Input topic     Records in input topic
> >  Consumer offset repartition topic     Records in repartition topic
> > 00:00    0                                               1.000.000
> >              0                                                        0
> > 00:01    100.000                                    1.000.000
> >          0                                                        100.000
> > 00:02    200.000                                    1.000.000
> >          0                                                        200.000
> > 00:03    300.000                                    1.000.000
> >          0                                                        300.000
> > 00:04    400.000                                    1.000.000
> >          0                                                        400.000
> > 00:05    500.000                                    1.000.000
> >          0                                                        500.000
> > 00:06    600.000                                    1.000.000
> >          0                                                        600.000
> > 00:07    700.000                                    1.000.000
> >          0                                                        700.000
> > 00:08    800.000                                    1.000.000
> >          0                                                        800.000
> > 00:09    900.000                                    1.000.000
> >          0                                                        900.000
> > 00:10    1.000.000                                 1.000.000
> >        0                                                        1000.000
> > 00:11    1.000.000                                 1.000.000
> >        100.000                                             1000.000
> > 00:12    1.000.000                                 1.000.000
> >        200.000                                             1000.000
> > 00:13    1.000.000                                 1.000.000
> >        300.000                                             1000.000
> > 00:14    1.000.000                                 1.000.000
> >        400.000                                             1000.000
> > 00:15    1.000.000                                 1.000.000
> >        500.000                                             1000.000
> > 00:16    1.000.000                                 1.000.000
> >        600.000                                             1000.000
> > 00:17    1.000.000                                 1.000.000
> >        700.000                                             1000.000
> > 00:18    1.000.000                                 1.000.000
> >        800.000                                             1000.000
> > 00:19    1.000.000                                 1.000.000
> >        900.000                                             1000.000
> > 00:20    1.000.000                                 1.000.000
> >        1.000.000                                          1000.000
> >
> > As you can see, there is no parallel execution and its due to that the
> > broker does not give any records from repartition topic until input topic
> > is depleted.
> > By adding more threads than input partitions I managed to mitigate this
> > behaviour somewhat, but still not close to balanced.
> >
> > Ideally in such a situation where we rebuild stream states, I would more
> > expect a behaviour like this:
> >
> > Time     Consumer offset Input topic     Records in input topic
> >  Consumer offset repartition topic     Records in repartition topic
> > 00:00    0                                               1.000.000
> >              0                                                        0
> > 00:01    100.000                                    1.000.000
> >          0                                                        100.000
> > 00:02    200.000                                    1.000.000
> >          100.000                                             200.000
> > 00:03    300.000                                    1.000.000
> >          200.000                                             300.000
> > 00:04    400.000                                    1.000.000
> >          300.000                                             400.000
> > 00:05    500.000                                    1.000.000
> >          400.000                                             500.000
> > 00:06    600.000                                    1.000.000
> >          500.000                                             600.000
> > 00:07    700.000                                    1.000.000
> >          600.000                                             700.000
> > 00:08    800.000                                    1.000.000
> >          700.000                                             800.000
> > 00:09    900.000                                    1.000.000
> >          800.000                                             900.000
> > 00:10    1.000.000                                 1.000.000
> >        900.000                                             1000.000
> > 00:10    1.000.000                                 1.000.000
> >        1.000.000                                          1000.000
> >
> >
> > Kind regards
> > Niklas
> >
> > On Tue, Jan 22, 2019 at 6:48 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Niklas,
> > >
> > > If you can monitor your repartition topic's consumer lag, and it was
> > > increasing consistently, it means your downstream processor cannot
> simply
> > > keep up with the throughput of the upstream processor. Usually it means
> > > your downstream operators is heavier (e.g. aggregations, joins that are
> > all
> > > stateful) than your upstreams (e.g. simply for shuffling the data to
> > > repartition topics), and since tasks assignment only consider a task as
> > the
> > > smallest unit of work and did not differentiate "heavy" and "light"
> > tasks,
> > > such imbalance of task assignment may happen. At the moment, to resolve
> > > this you should add more resources to make sure the heavy tasks get
> > enough
> > > computational resource assigned (more threads, e.g.).
> > >
> > > If your observed consumer lag stays plateau after increasing to some
> > point,
> > > it means your consumer can actually keep up with some constant lag; if
> > you
> > > hit your open file limits before seeing this, it means you either need
> to
> > > increase your open file limits, OR you can simply increase the segment
> > size
> > > to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the value
> of
> > > TopicConfig.SEGMENT_BYTES_CONFIG.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn <ni...@gmail.com>
> > wrote:
> > >
> > > > Hi Kafka Devs & Users,
> > > >
> > > > We recently had an issue where we processed a lot of old data and we
> > > > crashed our brokers due to too many memory mapped files.
> > > > It seems to me that the nature of Kafka / Kafka Streams is a bit
> > > > suboptimal in terms of resource management. (Keeping all files open
> all
> > > the
> > > > time, maybe there should be something managing this more on-demand?)
> > > >
> > > > In the issue I described, the repartition topic was produced very
> fast,
> > > > but not consumed, causing a lot of segments and files to be open at
> the
> > > > same time.
> > > >
> > > > I have worked around the issue by making sure I have more threads
> than
> > > > partitions to force tasks to subscribe to internal topics only, but
> > > seems a
> > > > bit hacky and maybe there should be some guidance in documentation if
> > > > considered part of design..
> > > >
> > > > After quite some testing and code reversing it seems that the nature
> of
> > > > this imbalance lies within how the broker multiplexes the consumed
> > > > topic-partitions.
> > > >
> > > > I have attached a slide that I will present to my team to explain the
> > > > issue in a bit more detail, it might be good to check it out to
> > > understand
> > > > the context.
> > > >
> > > > Any thoughts about my findings and concerns?
> > > >
> > > > Kind regards
> > > > Niklas
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Re: Open files clogging and KafkaStreams

Posted by Guozhang Wang <wa...@gmail.com>.
I see.

What you described is a known issue in the older version of Kafka, that
some high traffic topics in the bootstrap mode may effectively "starve"
other topics in the fetch response, since brokers used to naively fill in
the bytes that meets the max.bytes configuration and returns. This is fixed
in 1.1 version via incremental fetch request:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability

The basic idea is to not always request topics like A,B,C; instead if the
previous request asks for topics A,B,C and got all data from A, then next
request would be B,C,A, etc. So if you are on older versions of Kafka I'd
suggest you upgrade to newer version.

If you cannot upgrade atm, another suggest as I mentioned above is to
change the segment sizes so you can have much larger, and hence fewer
segment files.

Guozhang


On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn <ni...@gmail.com> wrote:

> Hi Guozhang,
>
> I think I went a bit ahead of myself in describing the situation, I had an
> attachment with the context in detail, maybe it was filtered out. Lets try
> again =)
>
> We have a topology looking something like this:
>
> input-topic[20 partitions, compacted]
>     |
> use-case-repartition[20 partitions, infinite retention, segment.ms=10min]
>     |
> use-case-changelog
>
> We have previously hit the TooManyOpenFiles issue and "solved" it by
> raising the bar to something extreme.
> Later we found out that we wanted rep factor 3 on all internal topics, so
> we reset the app and BOOM, now we hit a too many memory mapped files limit
> instead
>
> the input topic contains 30 days of data, where we pretty much have records
> in every 10minute window for every partition.
> This means if nothing consumes the repartition topic we will have 6 (10 min
> slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex
> files) * 3 replication factor / 5 brokers in cluster = *155.520 *open files
> just to have this repartition topic in place.
>
> You would say, yeah but no problem as it would be deleted and you would not
> reach such high numbers? But doesn't seem to be the case.
> What happened in our case is that, due to how the broker multiplexes the
> topic partitions for the subscribers, the streams application piled up all
> the repartition records, and only when caught up, all the downstream
> processes started taking place. I do see this as a design flaw in some
> component, probably the broker. It cant be the desired behaviour. How many
> open files do I need to be able to have open in a year of data when
> resetting/reprocessing an application?
>
> By adding more threads than input topic partitions, I managed to force the
> broker to give out these records earlier and issue was mitigated.
>
> Ideally the downstream records should be processed somewhere near in time
> as the source record.
>
> Lets take one partition, containing 1.000.000 records this is the observed
> behaviour I have seen: (Somewhat simplified)
>
> Time     Consumer offset Input topic     Records in input topic
>  Consumer offset repartition topic     Records in repartition topic
> 00:00    0                                               1.000.000
>              0                                                        0
> 00:01    100.000                                    1.000.000
>          0                                                        100.000
> 00:02    200.000                                    1.000.000
>          0                                                        200.000
> 00:03    300.000                                    1.000.000
>          0                                                        300.000
> 00:04    400.000                                    1.000.000
>          0                                                        400.000
> 00:05    500.000                                    1.000.000
>          0                                                        500.000
> 00:06    600.000                                    1.000.000
>          0                                                        600.000
> 00:07    700.000                                    1.000.000
>          0                                                        700.000
> 00:08    800.000                                    1.000.000
>          0                                                        800.000
> 00:09    900.000                                    1.000.000
>          0                                                        900.000
> 00:10    1.000.000                                 1.000.000
>        0                                                        1000.000
> 00:11    1.000.000                                 1.000.000
>        100.000                                             1000.000
> 00:12    1.000.000                                 1.000.000
>        200.000                                             1000.000
> 00:13    1.000.000                                 1.000.000
>        300.000                                             1000.000
> 00:14    1.000.000                                 1.000.000
>        400.000                                             1000.000
> 00:15    1.000.000                                 1.000.000
>        500.000                                             1000.000
> 00:16    1.000.000                                 1.000.000
>        600.000                                             1000.000
> 00:17    1.000.000                                 1.000.000
>        700.000                                             1000.000
> 00:18    1.000.000                                 1.000.000
>        800.000                                             1000.000
> 00:19    1.000.000                                 1.000.000
>        900.000                                             1000.000
> 00:20    1.000.000                                 1.000.000
>        1.000.000                                          1000.000
>
> As you can see, there is no parallel execution and its due to that the
> broker does not give any records from repartition topic until input topic
> is depleted.
> By adding more threads than input partitions I managed to mitigate this
> behaviour somewhat, but still not close to balanced.
>
> Ideally in such a situation where we rebuild stream states, I would more
> expect a behaviour like this:
>
> Time     Consumer offset Input topic     Records in input topic
>  Consumer offset repartition topic     Records in repartition topic
> 00:00    0                                               1.000.000
>              0                                                        0
> 00:01    100.000                                    1.000.000
>          0                                                        100.000
> 00:02    200.000                                    1.000.000
>          100.000                                             200.000
> 00:03    300.000                                    1.000.000
>          200.000                                             300.000
> 00:04    400.000                                    1.000.000
>          300.000                                             400.000
> 00:05    500.000                                    1.000.000
>          400.000                                             500.000
> 00:06    600.000                                    1.000.000
>          500.000                                             600.000
> 00:07    700.000                                    1.000.000
>          600.000                                             700.000
> 00:08    800.000                                    1.000.000
>          700.000                                             800.000
> 00:09    900.000                                    1.000.000
>          800.000                                             900.000
> 00:10    1.000.000                                 1.000.000
>        900.000                                             1000.000
> 00:10    1.000.000                                 1.000.000
>        1.000.000                                          1000.000
>
>
> Kind regards
> Niklas
>
> On Tue, Jan 22, 2019 at 6:48 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Niklas,
> >
> > If you can monitor your repartition topic's consumer lag, and it was
> > increasing consistently, it means your downstream processor cannot simply
> > keep up with the throughput of the upstream processor. Usually it means
> > your downstream operators is heavier (e.g. aggregations, joins that are
> all
> > stateful) than your upstreams (e.g. simply for shuffling the data to
> > repartition topics), and since tasks assignment only consider a task as
> the
> > smallest unit of work and did not differentiate "heavy" and "light"
> tasks,
> > such imbalance of task assignment may happen. At the moment, to resolve
> > this you should add more resources to make sure the heavy tasks get
> enough
> > computational resource assigned (more threads, e.g.).
> >
> > If your observed consumer lag stays plateau after increasing to some
> point,
> > it means your consumer can actually keep up with some constant lag; if
> you
> > hit your open file limits before seeing this, it means you either need to
> > increase your open file limits, OR you can simply increase the segment
> size
> > to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the value of
> > TopicConfig.SEGMENT_BYTES_CONFIG.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn <ni...@gmail.com>
> wrote:
> >
> > > Hi Kafka Devs & Users,
> > >
> > > We recently had an issue where we processed a lot of old data and we
> > > crashed our brokers due to too many memory mapped files.
> > > It seems to me that the nature of Kafka / Kafka Streams is a bit
> > > suboptimal in terms of resource management. (Keeping all files open all
> > the
> > > time, maybe there should be something managing this more on-demand?)
> > >
> > > In the issue I described, the repartition topic was produced very fast,
> > > but not consumed, causing a lot of segments and files to be open at the
> > > same time.
> > >
> > > I have worked around the issue by making sure I have more threads than
> > > partitions to force tasks to subscribe to internal topics only, but
> > seems a
> > > bit hacky and maybe there should be some guidance in documentation if
> > > considered part of design..
> > >
> > > After quite some testing and code reversing it seems that the nature of
> > > this imbalance lies within how the broker multiplexes the consumed
> > > topic-partitions.
> > >
> > > I have attached a slide that I will present to my team to explain the
> > > issue in a bit more detail, it might be good to check it out to
> > understand
> > > the context.
> > >
> > > Any thoughts about my findings and concerns?
> > >
> > > Kind regards
> > > Niklas
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: Open files clogging and KafkaStreams

Posted by Niklas Lönn <ni...@gmail.com>.
Hi Guozhang,

I think I went a bit ahead of myself in describing the situation, I had an
attachment with the context in detail, maybe it was filtered out. Lets try
again =)

We have a topology looking something like this:

input-topic[20 partitions, compacted]
    |
use-case-repartition[20 partitions, infinite retention, segment.ms=10min]
    |
use-case-changelog

We have previously hit the TooManyOpenFiles issue and "solved" it by
raising the bar to something extreme.
Later we found out that we wanted rep factor 3 on all internal topics, so
we reset the app and BOOM, now we hit a too many memory mapped files limit
instead

the input topic contains 30 days of data, where we pretty much have records
in every 10minute window for every partition.
This means if nothing consumes the repartition topic we will have 6 (10 min
slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex
files) * 3 replication factor / 5 brokers in cluster = *155.520 *open files
just to have this repartition topic in place.

You would say, yeah but no problem as it would be deleted and you would not
reach such high numbers? But doesn't seem to be the case.
What happened in our case is that, due to how the broker multiplexes the
topic partitions for the subscribers, the streams application piled up all
the repartition records, and only when caught up, all the downstream
processes started taking place. I do see this as a design flaw in some
component, probably the broker. It cant be the desired behaviour. How many
open files do I need to be able to have open in a year of data when
resetting/reprocessing an application?

By adding more threads than input topic partitions, I managed to force the
broker to give out these records earlier and issue was mitigated.

Ideally the downstream records should be processed somewhere near in time
as the source record.

Lets take one partition, containing 1.000.000 records this is the observed
behaviour I have seen: (Somewhat simplified)

Time     Consumer offset Input topic     Records in input topic
 Consumer offset repartition topic     Records in repartition topic
00:00    0                                               1.000.000
             0                                                        0
00:01    100.000                                    1.000.000
         0                                                        100.000
00:02    200.000                                    1.000.000
         0                                                        200.000
00:03    300.000                                    1.000.000
         0                                                        300.000
00:04    400.000                                    1.000.000
         0                                                        400.000
00:05    500.000                                    1.000.000
         0                                                        500.000
00:06    600.000                                    1.000.000
         0                                                        600.000
00:07    700.000                                    1.000.000
         0                                                        700.000
00:08    800.000                                    1.000.000
         0                                                        800.000
00:09    900.000                                    1.000.000
         0                                                        900.000
00:10    1.000.000                                 1.000.000
       0                                                        1000.000
00:11    1.000.000                                 1.000.000
       100.000                                             1000.000
00:12    1.000.000                                 1.000.000
       200.000                                             1000.000
00:13    1.000.000                                 1.000.000
       300.000                                             1000.000
00:14    1.000.000                                 1.000.000
       400.000                                             1000.000
00:15    1.000.000                                 1.000.000
       500.000                                             1000.000
00:16    1.000.000                                 1.000.000
       600.000                                             1000.000
00:17    1.000.000                                 1.000.000
       700.000                                             1000.000
00:18    1.000.000                                 1.000.000
       800.000                                             1000.000
00:19    1.000.000                                 1.000.000
       900.000                                             1000.000
00:20    1.000.000                                 1.000.000
       1.000.000                                          1000.000

As you can see, there is no parallel execution and its due to that the
broker does not give any records from repartition topic until input topic
is depleted.
By adding more threads than input partitions I managed to mitigate this
behaviour somewhat, but still not close to balanced.

Ideally in such a situation where we rebuild stream states, I would more
expect a behaviour like this:

Time     Consumer offset Input topic     Records in input topic
 Consumer offset repartition topic     Records in repartition topic
00:00    0                                               1.000.000
             0                                                        0
00:01    100.000                                    1.000.000
         0                                                        100.000
00:02    200.000                                    1.000.000
         100.000                                             200.000
00:03    300.000                                    1.000.000
         200.000                                             300.000
00:04    400.000                                    1.000.000
         300.000                                             400.000
00:05    500.000                                    1.000.000
         400.000                                             500.000
00:06    600.000                                    1.000.000
         500.000                                             600.000
00:07    700.000                                    1.000.000
         600.000                                             700.000
00:08    800.000                                    1.000.000
         700.000                                             800.000
00:09    900.000                                    1.000.000
         800.000                                             900.000
00:10    1.000.000                                 1.000.000
       900.000                                             1000.000
00:10    1.000.000                                 1.000.000
       1.000.000                                          1000.000


Kind regards
Niklas

On Tue, Jan 22, 2019 at 6:48 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Niklas,
>
> If you can monitor your repartition topic's consumer lag, and it was
> increasing consistently, it means your downstream processor cannot simply
> keep up with the throughput of the upstream processor. Usually it means
> your downstream operators is heavier (e.g. aggregations, joins that are all
> stateful) than your upstreams (e.g. simply for shuffling the data to
> repartition topics), and since tasks assignment only consider a task as the
> smallest unit of work and did not differentiate "heavy" and "light" tasks,
> such imbalance of task assignment may happen. At the moment, to resolve
> this you should add more resources to make sure the heavy tasks get enough
> computational resource assigned (more threads, e.g.).
>
> If your observed consumer lag stays plateau after increasing to some point,
> it means your consumer can actually keep up with some constant lag; if you
> hit your open file limits before seeing this, it means you either need to
> increase your open file limits, OR you can simply increase the segment size
> to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the value of
> TopicConfig.SEGMENT_BYTES_CONFIG.
>
>
> Guozhang
>
>
> On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn <ni...@gmail.com> wrote:
>
> > Hi Kafka Devs & Users,
> >
> > We recently had an issue where we processed a lot of old data and we
> > crashed our brokers due to too many memory mapped files.
> > It seems to me that the nature of Kafka / Kafka Streams is a bit
> > suboptimal in terms of resource management. (Keeping all files open all
> the
> > time, maybe there should be something managing this more on-demand?)
> >
> > In the issue I described, the repartition topic was produced very fast,
> > but not consumed, causing a lot of segments and files to be open at the
> > same time.
> >
> > I have worked around the issue by making sure I have more threads than
> > partitions to force tasks to subscribe to internal topics only, but
> seems a
> > bit hacky and maybe there should be some guidance in documentation if
> > considered part of design..
> >
> > After quite some testing and code reversing it seems that the nature of
> > this imbalance lies within how the broker multiplexes the consumed
> > topic-partitions.
> >
> > I have attached a slide that I will present to my team to explain the
> > issue in a bit more detail, it might be good to check it out to
> understand
> > the context.
> >
> > Any thoughts about my findings and concerns?
> >
> > Kind regards
> > Niklas
> >
>
>
> --
> -- Guozhang
>

Re: Open files clogging and KafkaStreams

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Niklas,

If you can monitor your repartition topic's consumer lag, and it was
increasing consistently, it means your downstream processor cannot simply
keep up with the throughput of the upstream processor. Usually it means
your downstream operators is heavier (e.g. aggregations, joins that are all
stateful) than your upstreams (e.g. simply for shuffling the data to
repartition topics), and since tasks assignment only consider a task as the
smallest unit of work and did not differentiate "heavy" and "light" tasks,
such imbalance of task assignment may happen. At the moment, to resolve
this you should add more resources to make sure the heavy tasks get enough
computational resource assigned (more threads, e.g.).

If your observed consumer lag stays plateau after increasing to some point,
it means your consumer can actually keep up with some constant lag; if you
hit your open file limits before seeing this, it means you either need to
increase your open file limits, OR you can simply increase the segment size
to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the value of
TopicConfig.SEGMENT_BYTES_CONFIG.


Guozhang


On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn <ni...@gmail.com> wrote:

> Hi Kafka Devs & Users,
>
> We recently had an issue where we processed a lot of old data and we
> crashed our brokers due to too many memory mapped files.
> It seems to me that the nature of Kafka / Kafka Streams is a bit
> suboptimal in terms of resource management. (Keeping all files open all the
> time, maybe there should be something managing this more on-demand?)
>
> In the issue I described, the repartition topic was produced very fast,
> but not consumed, causing a lot of segments and files to be open at the
> same time.
>
> I have worked around the issue by making sure I have more threads than
> partitions to force tasks to subscribe to internal topics only, but seems a
> bit hacky and maybe there should be some guidance in documentation if
> considered part of design..
>
> After quite some testing and code reversing it seems that the nature of
> this imbalance lies within how the broker multiplexes the consumed
> topic-partitions.
>
> I have attached a slide that I will present to my team to explain the
> issue in a bit more detail, it might be good to check it out to understand
> the context.
>
> Any thoughts about my findings and concerns?
>
> Kind regards
> Niklas
>


-- 
-- Guozhang