You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Patrik Kleindl <pk...@gmail.com> on 2019/03/01 08:14:50 UTC

Re: Minimizing global store restoration time

Hi Guozhang

I have created https://issues.apache.org/jira/browse/KAFKA-8023 and by
accident found https://issues.apache.org/jira/browse/KAFKA-6721 which was
what I was looking for at the beginning.
Does this need a KIP?
I can maybe help with the writeup but I am not sure I should help with the
code ;-)

6721 might indirectly cover point 1) from above as currently (if I
understand Taylor correctly) it seems a bit inconsistent that the normal
Tables have split config for processing and restore while in the global
case both are shared, although it is understandably just a result of using
only one consumer for the global state stores.

best regards
Patrik

On Thu, 28 Feb 2019 at 23:46, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Taylor,
>
> 1) Yes we do allow users to have separate config values for global
> consumers / restore consumers via StreamsConfig#restoreConsumerPrefix and
> StreamsConfig#globalConsumerPrefix, as Patrik pointed out.
>
> 2) I think I agree with you that for global consumer, it is worth while to
> allow one than one update thread (for restore consumer though we have the
> same stream thread for it by design, so that is much harder to
> re-architecture). Would you mind creating a JIRA ticket for it so we do not
> forget about this potential improvement?
>
> Guozhang
>
> On Wed, Feb 27, 2019 at 2:02 PM Taylor P <td...@gmail.com> wrote:
>
> > Hi Guozhang, Patrik,
> >
> > Yes, the global consumer setting is what needs to be changed for these
> > settings. The restore consumer configs aren't used at all since a
> separate
> > restore consumer is not initialized for global state store restoration -
> > the global consumer is used. I think it would be an improvement to allow
> > for using different configs for the global consumer between restoration
> and
> > regular processing.
> >
> > I previously tried tweaking fetch.max.bytes and receive.buffer.bytes, but
> > if I recall correctly, I was still capped around 100K records/sec. I will
> > try tweaking them again when I get time.
> >
> > Is there anything major that would prevent parallelizing the restoration
> of
> > each partition of the global state store? It looks like that would be a
> > decent chunk of work to refactor, but I think that would have the biggest
> > impact in reducing global state restoration times for the scenario where
> > the keyspace of the global state store is very large.
> >
> > Taylor
> >
> >
> > On Thu, Feb 21, 2019 at 6:31 AM Patrik Kleindl <pk...@gmail.com>
> wrote:
> >
> > > Hello Guozhang,
> > >
> > > thanks, that might help us too.
> > > Just to confirm, this depends on KTable/GlobalKTable usage, right?
> > > I did a test with
> > >
> > >
> > >
> >
> streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG),
> > > 65536);
> > >
> > >
> >
> streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
> > > 52428800);
> > >
> > >
> >
> streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG),
> > > 65536);
> > >
> > >
> >
> streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
> > > 52428800);
> > >
> > > and only if I change the globalConsumerPrefix values I see a change for
> > the
> > > GlobalKTable restoration.
> > >
> > > br, Patrik
> > >
> > > PS: Logs from the test, seems to work fine and get faster/slower
> > depending
> > > on the change:
> > >
> > > Default Values:
> > >
> > > 743
> > >
> > >
> >
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.Metadata - Cluster ID:
> > s2467KdmTlKV5b2YGe831g
> > > 936
> > >
> > >
> >
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
> > > groupId=] Resetting offset for partition topic-0 to offset 2480157.
> > > 13378
> > >
> > >
> >
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
> > > groupId=] Resetting offset for partition topic-5 to offset 2311478.
> > > 26459
> > >
> > >
> >
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
> > > groupId=] Resetting offset for partition topic-10 to offset 2430617.
> > > 38178
> > >
> > >
> >
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
> > > groupId=] Resetting offset for partition topic-8 to offset 2295704.
> > >
> > > Default * 4:
> > > 714
> > >
> > >
> >
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.Metadata - Cluster ID:
> > s2467KdmTlKV5b2YGe831g
> > > 862
> > >
> > >
> >
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> > > groupId=] Resetting offset for partition topic-0 to offset 2480157.
> > > 10465
> > >
> > >
> >
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> > > groupId=] Resetting offset for partition topic-5 to offset 2311478.
> > > 20014
> > >
> > >
> >
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> > > groupId=] Resetting offset for partition topic-10 to offset 2430617.
> > > 29570
> > >
> > >
> >
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> > > groupId=] Resetting offset for partition topic-8 to offset 2295704.
> > > 40066
> > >
> > >
> >
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> > >
> > > Default / 4:
> > > 679
> > >
> > >
> >
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> > > INFO
> org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl
> > -
> > > global-stream-thread
> > >
> > >
> >
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> > > Restoring state for global store topic-STATE-STORE-0000000000
> > > 725
> > >
> > >
> >
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.Metadata - Cluster ID:
> > s2467KdmTlKV5b2YGe831g
> > > 848
> > >
> > >
> >
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer,
> > > groupId=] Resetting offset for partition topic-0 to offset 2480157.
> > > 29283
> > >
> > >
> >
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer,
> > > groupId=] Resetting offset for partition topic-5 to offset 2311478.
> > > 56349
> > >
> > >
> >
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer,
> > > groupId=] Resetting offset for partition topic-10 to offset 2430617.
> > >
> > > On Wed, 20 Feb 2019 at 19:16, Guozhang Wang <wa...@gmail.com>
> wrote:
> > >
> > > > Hello Taylor,
> > > >
> > > > Sorry for the late reply! And thanks for the updated information.
> > > >
> > > > I'd recommend overriding some consumer configs via `StreamsConfig`
> (you
> > > can
> > > > use the StreamsConfig#restoreConsumerPrefix for that) for the
> following
> > > > props:
> > > >
> > > > 1) increase RECEIVE_BUFFER_CONFIG (64K may cause poll to return early
> > > than
> > > > necessary)
> > > > 2) increase FETCH_MAX_BYTES_CONFIG if you'd expect the total size of
> > your
> > > > maximum 2000 records to possibly exceed it (default is 50Mb).
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > > On Fri, Feb 8, 2019 at 12:43 AM Patrik Kleindl <pk...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Taylor
> > > > > You are right, the parallel processing is not mentioned in this
> > issue,
> > > if
> > > > > I remember correctly it was in the thread that lead to it as a
> > > > possibility
> > > > > when changing to the restoration listeners.
> > > > > Best regards
> > > > > Patrik
> > > > >
> > > > > > Am 07.02.2019 um 00:47 schrieb Taylor P <td...@gmail.com>:
> > > > > >
> > > > > > Hi Patrik,
> > > > > >
> > > > > > I am not sure that
> > https://issues.apache.org/jira/browse/KAFKA-7380
> > > > will
> > > > > > resolve this issue since our application is dependent on the
> global
> > > > store
> > > > > > being fully restored before the application can be considered
> > > healthy.
> > > > It
> > > > > > does not seem like KAFKA-7380 is aiming to address the nature of
> > > global
> > > > > > stores restoring each partition sequentially - it is aiming to
> > change
> > > > the
> > > > > > blocking nature of #start(). Restoring the global store
> partitions
> > in
> > > > > > parallel would definitely speed things up, though, and admittedly
> > my
> > > > > first
> > > > > > thought when debugging this was "why isn't this restoring each
> > > > partition
> > > > > in
> > > > > > parallel?".
> > > > > >
> > > > > > Changing our streams topology to avoid using a global store for
> > such
> > > a
> > > > > > large amount of data would be doable but it does seem like a
> > > > significant
> > > > > > amount of work. I am curious to know if anyone else is storing
> > large
> > > > > > amounts of data in global stores and whether there are any
> inherent
> > > > > > limitations to the size of global stores.
> > > > > >
> > > > > > Our topic is already using compaction.
> > > > > >
> > > > > > Taylor
> > > > > >
> > > > > >> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl <
> pkleindl@gmail.com
> > >
> > > > > wrote:
> > > > > >>
> > > > > >> Hi Taylor
> > > > > >>
> > > > > >> We are facing the same issue, although on a smaller scale.
> > > > > >> The main problem as you found is that the restoration is running
> > > > > >> sequentially, this should be addressed in
> > > > > >> https://issues.apache.org/jira/browse/KAFKA-7380, although
> there
> > > has
> > > > > been
> > > > > >> no progress lately.
> > > > > >>
> > > > > >> On the other hand you could try re-evaluate if your problem can
> > only
> > > > be
> > > > > >> solved with global state stores, in our case (both in streams as
> > > well
> > > > as
> > > > > >> for interactive queries) we could solve it with local state
> stores
> > > > too,
> > > > > >> although only with more changes and more complexity in the
> > topology.
> > > > > >>
> > > > > >> Not sure if it is applicable for your case, but have you looked
> > into
> > > > > >> compression for the topics?
> > > > > >>
> > > > > >> best regards
> > > > > >>
> > > > > >> Patrik
> > > > > >>
> > > > > >>> On Tue, 5 Feb 2019 at 22:37, Taylor P <td...@gmail.com>
> > wrote:
> > > > > >>>
> > > > > >>> Hi,
> > > > > >>>
> > > > > >>> I am having issues with the global store taking a very long
> time
> > to
> > > > > >> restore
> > > > > >>> during startup of a Kafka Streams 2.0.1 application. The global
> > > store
> > > > > is
> > > > > >>> backed by a RocksDB persistent store and is added to the
> Streams
> > > > > topology
> > > > > >>> in the following manner: https://pastebin.com/raw/VJutDyYe The
> > > > global
> > > > > >>> store
> > > > > >>> topic has approximately 15 million records per partition and 18
> > > > > >> partitions.
> > > > > >>> The following global consumer settings are specified:
> > > > > >>>
> > > > > >>>    poll.timeout.ms = 10
> > > > > >>>    max.poll.records = 2000
> > > > > >>>    max.partition.fetch.bytes = 1048576
> > > > > >>>    fetch.max.bytes = 52428800
> > > > > >>>    receive.buffer.bytes = 65536
> > > > > >>>
> > > > > >>> I have tried tweaking the settings above on the consumer side,
> > such
> > > > as
> > > > > >>> increasing poll.timeout.ms to 2000, max.poll.records to 10000,
> > and
> > > > > >>> max.partition.fetch.bytes to 52428800, but it seems that I keep
> > > > > hitting a
> > > > > >>> ceiling of restoring approximately 100,000 records per second.
> > With
> > > > 15
> > > > > >>> million records per partition, it takes approximately 150
> seconds
> > > to
> > > > > >>> restore a single partition. With 18 partitions, it takes
> roughly
> > 45
> > > > > >> minutes
> > > > > >>> to fully restore the global store.
> > > > > >>>
> > > > > >>> Switching from HDDs to SSDs on the brokers' log directories
> made
> > > > > >>> restoration roughly 25% faster overall, but this still feels
> > slow.
> > > It
> > > > > >> seems
> > > > > >>> that I am hitting IOPS limits on the disks and am not even
> close
> > to
> > > > > >> hitting
> > > > > >>> the throughput limits of the disks on either the broker or
> > streams
> > > > > >>> application side.
> > > > > >>>
> > > > > >>> How can I minimize restoration time of a global store? Are
> there
> > > > > settings
> > > > > >>> that can increase throughput with the same number of IOPS?
> > Ideally
> > > > > >>> restoration of each partition could be done in parallel but I
> > > > recognize
> > > > > >>> there is only a single global store thread. Bringing up a new
> > > > instance
> > > > > of
> > > > > >>> the Kafka Streams application occurs on a potentially daily
> > basis,
> > > so
> > > > > the
> > > > > >>> restoration time is becoming more and more of a hassle.
> > > > > >>>
> > > > > >>> Thanks.
> > > > > >>>
> > > > > >>> Taylor
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Re: Minimizing global store restoration time

Posted by Guozhang Wang <wa...@gmail.com>.
KAFKA-6721 <https://issues.apache.org/jira/browse/KAFKA-6721> should not
need a KIP since it is for refactoring the internal classes only for
cleaner code. However 8023 which is aimed for having multi-thread support
would not be covered by 6721.


Guozhang

On Fri, Mar 1, 2019 at 12:15 AM Patrik Kleindl <pk...@gmail.com> wrote:

> Hi Guozhang
>
> I have created https://issues.apache.org/jira/browse/KAFKA-8023 and by
> accident found https://issues.apache.org/jira/browse/KAFKA-6721 which was
> what I was looking for at the beginning.
> Does this need a KIP?
> I can maybe help with the writeup but I am not sure I should help with the
> code ;-)
>
> 6721 might indirectly cover point 1) from above as currently (if I
> understand Taylor correctly) it seems a bit inconsistent that the normal
> Tables have split config for processing and restore while in the global
> case both are shared, although it is understandably just a result of using
> only one consumer for the global state stores.
>
> best regards
> Patrik
>
> On Thu, 28 Feb 2019 at 23:46, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Taylor,
> >
> > 1) Yes we do allow users to have separate config values for global
> > consumers / restore consumers via StreamsConfig#restoreConsumerPrefix and
> > StreamsConfig#globalConsumerPrefix, as Patrik pointed out.
> >
> > 2) I think I agree with you that for global consumer, it is worth while
> to
> > allow one than one update thread (for restore consumer though we have the
> > same stream thread for it by design, so that is much harder to
> > re-architecture). Would you mind creating a JIRA ticket for it so we do
> not
> > forget about this potential improvement?
> >
> > Guozhang
> >
> > On Wed, Feb 27, 2019 at 2:02 PM Taylor P <td...@gmail.com> wrote:
> >
> > > Hi Guozhang, Patrik,
> > >
> > > Yes, the global consumer setting is what needs to be changed for these
> > > settings. The restore consumer configs aren't used at all since a
> > separate
> > > restore consumer is not initialized for global state store restoration
> -
> > > the global consumer is used. I think it would be an improvement to
> allow
> > > for using different configs for the global consumer between restoration
> > and
> > > regular processing.
> > >
> > > I previously tried tweaking fetch.max.bytes and receive.buffer.bytes,
> but
> > > if I recall correctly, I was still capped around 100K records/sec. I
> will
> > > try tweaking them again when I get time.
> > >
> > > Is there anything major that would prevent parallelizing the
> restoration
> > of
> > > each partition of the global state store? It looks like that would be a
> > > decent chunk of work to refactor, but I think that would have the
> biggest
> > > impact in reducing global state restoration times for the scenario
> where
> > > the keyspace of the global state store is very large.
> > >
> > > Taylor
> > >
> > >
> > > On Thu, Feb 21, 2019 at 6:31 AM Patrik Kleindl <pk...@gmail.com>
> > wrote:
> > >
> > > > Hello Guozhang,
> > > >
> > > > thanks, that might help us too.
> > > > Just to confirm, this depends on KTable/GlobalKTable usage, right?
> > > > I did a test with
> > > >
> > > >
> > > >
> > >
> >
> streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG),
> > > > 65536);
> > > >
> > > >
> > >
> >
> streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
> > > > 52428800);
> > > >
> > > >
> > >
> >
> streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG),
> > > > 65536);
> > > >
> > > >
> > >
> >
> streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
> > > > 52428800);
> > > >
> > > > and only if I change the globalConsumerPrefix values I see a change
> for
> > > the
> > > > GlobalKTable restoration.
> > > >
> > > > br, Patrik
> > > >
> > > > PS: Logs from the test, seems to work fine and get faster/slower
> > > depending
> > > > on the change:
> > > >
> > > > Default Values:
> > > >
> > > > 743
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.Metadata - Cluster ID:
> > > s2467KdmTlKV5b2YGe831g
> > > > 936
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > > >
> > > >
> > >
> >
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
> > > > groupId=] Resetting offset for partition topic-0 to offset 2480157.
> > > > 13378
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > > >
> > > >
> > >
> >
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
> > > > groupId=] Resetting offset for partition topic-5 to offset 2311478.
> > > > 26459
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > > >
> > > >
> > >
> >
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
> > > > groupId=] Resetting offset for partition topic-10 to offset 2430617.
> > > > 38178
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > > >
> > > >
> > >
> >
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
> > > > groupId=] Resetting offset for partition topic-8 to offset 2295704.
> > > >
> > > > Default * 4:
> > > > 714
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.Metadata - Cluster ID:
> > > s2467KdmTlKV5b2YGe831g
> > > > 862
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > > >
> > > >
> > >
> >
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> > > > groupId=] Resetting offset for partition topic-0 to offset 2480157.
> > > > 10465
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > > >
> > > >
> > >
> >
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> > > > groupId=] Resetting offset for partition topic-5 to offset 2311478.
> > > > 20014
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > > >
> > > >
> > >
> >
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> > > > groupId=] Resetting offset for partition topic-10 to offset 2430617.
> > > > 29570
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > > >
> > > >
> > >
> >
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> > > > groupId=] Resetting offset for partition topic-8 to offset 2295704.
> > > > 40066
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > > >
> > > >
> > >
> >
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> > > >
> > > > Default / 4:
> > > > 679
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> > > > INFO
> > org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl
> > > -
> > > > global-stream-thread
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> > > > Restoring state for global store topic-STATE-STORE-0000000000
> > > > 725
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.Metadata - Cluster ID:
> > > s2467KdmTlKV5b2YGe831g
> > > > 848
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > > >
> > > >
> > >
> >
> clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer,
> > > > groupId=] Resetting offset for partition topic-0 to offset 2480157.
> > > > 29283
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > > >
> > > >
> > >
> >
> clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer,
> > > > groupId=] Resetting offset for partition topic-5 to offset 2311478.
> > > > 56349
> > > >
> > > >
> > >
> >
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > > >
> > > >
> > >
> >
> clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer,
> > > > groupId=] Resetting offset for partition topic-10 to offset 2430617.
> > > >
> > > > On Wed, 20 Feb 2019 at 19:16, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > > >
> > > > > Hello Taylor,
> > > > >
> > > > > Sorry for the late reply! And thanks for the updated information.
> > > > >
> > > > > I'd recommend overriding some consumer configs via `StreamsConfig`
> > (you
> > > > can
> > > > > use the StreamsConfig#restoreConsumerPrefix for that) for the
> > following
> > > > > props:
> > > > >
> > > > > 1) increase RECEIVE_BUFFER_CONFIG (64K may cause poll to return
> early
> > > > than
> > > > > necessary)
> > > > > 2) increase FETCH_MAX_BYTES_CONFIG if you'd expect the total size
> of
> > > your
> > > > > maximum 2000 records to possibly exceed it (default is 50Mb).
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Feb 8, 2019 at 12:43 AM Patrik Kleindl <pkleindl@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi Taylor
> > > > > > You are right, the parallel processing is not mentioned in this
> > > issue,
> > > > if
> > > > > > I remember correctly it was in the thread that lead to it as a
> > > > > possibility
> > > > > > when changing to the restoration listeners.
> > > > > > Best regards
> > > > > > Patrik
> > > > > >
> > > > > > > Am 07.02.2019 um 00:47 schrieb Taylor P <td...@gmail.com>:
> > > > > > >
> > > > > > > Hi Patrik,
> > > > > > >
> > > > > > > I am not sure that
> > > https://issues.apache.org/jira/browse/KAFKA-7380
> > > > > will
> > > > > > > resolve this issue since our application is dependent on the
> > global
> > > > > store
> > > > > > > being fully restored before the application can be considered
> > > > healthy.
> > > > > It
> > > > > > > does not seem like KAFKA-7380 is aiming to address the nature
> of
> > > > global
> > > > > > > stores restoring each partition sequentially - it is aiming to
> > > change
> > > > > the
> > > > > > > blocking nature of #start(). Restoring the global store
> > partitions
> > > in
> > > > > > > parallel would definitely speed things up, though, and
> admittedly
> > > my
> > > > > > first
> > > > > > > thought when debugging this was "why isn't this restoring each
> > > > > partition
> > > > > > in
> > > > > > > parallel?".
> > > > > > >
> > > > > > > Changing our streams topology to avoid using a global store for
> > > such
> > > > a
> > > > > > > large amount of data would be doable but it does seem like a
> > > > > significant
> > > > > > > amount of work. I am curious to know if anyone else is storing
> > > large
> > > > > > > amounts of data in global stores and whether there are any
> > inherent
> > > > > > > limitations to the size of global stores.
> > > > > > >
> > > > > > > Our topic is already using compaction.
> > > > > > >
> > > > > > > Taylor
> > > > > > >
> > > > > > >> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl <
> > pkleindl@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >>
> > > > > > >> Hi Taylor
> > > > > > >>
> > > > > > >> We are facing the same issue, although on a smaller scale.
> > > > > > >> The main problem as you found is that the restoration is
> running
> > > > > > >> sequentially, this should be addressed in
> > > > > > >> https://issues.apache.org/jira/browse/KAFKA-7380, although
> > there
> > > > has
> > > > > > been
> > > > > > >> no progress lately.
> > > > > > >>
> > > > > > >> On the other hand you could try re-evaluate if your problem
> can
> > > only
> > > > > be
> > > > > > >> solved with global state stores, in our case (both in streams
> as
> > > > well
> > > > > as
> > > > > > >> for interactive queries) we could solve it with local state
> > stores
> > > > > too,
> > > > > > >> although only with more changes and more complexity in the
> > > topology.
> > > > > > >>
> > > > > > >> Not sure if it is applicable for your case, but have you
> looked
> > > into
> > > > > > >> compression for the topics?
> > > > > > >>
> > > > > > >> best regards
> > > > > > >>
> > > > > > >> Patrik
> > > > > > >>
> > > > > > >>> On Tue, 5 Feb 2019 at 22:37, Taylor P <td...@gmail.com>
> > > wrote:
> > > > > > >>>
> > > > > > >>> Hi,
> > > > > > >>>
> > > > > > >>> I am having issues with the global store taking a very long
> > time
> > > to
> > > > > > >> restore
> > > > > > >>> during startup of a Kafka Streams 2.0.1 application. The
> global
> > > > store
> > > > > > is
> > > > > > >>> backed by a RocksDB persistent store and is added to the
> > Streams
> > > > > > topology
> > > > > > >>> in the following manner: https://pastebin.com/raw/VJutDyYe
> The
> > > > > global
> > > > > > >>> store
> > > > > > >>> topic has approximately 15 million records per partition and
> 18
> > > > > > >> partitions.
> > > > > > >>> The following global consumer settings are specified:
> > > > > > >>>
> > > > > > >>>    poll.timeout.ms = 10
> > > > > > >>>    max.poll.records = 2000
> > > > > > >>>    max.partition.fetch.bytes = 1048576
> > > > > > >>>    fetch.max.bytes = 52428800
> > > > > > >>>    receive.buffer.bytes = 65536
> > > > > > >>>
> > > > > > >>> I have tried tweaking the settings above on the consumer
> side,
> > > such
> > > > > as
> > > > > > >>> increasing poll.timeout.ms to 2000, max.poll.records to
> 10000,
> > > and
> > > > > > >>> max.partition.fetch.bytes to 52428800, but it seems that I
> keep
> > > > > > hitting a
> > > > > > >>> ceiling of restoring approximately 100,000 records per
> second.
> > > With
> > > > > 15
> > > > > > >>> million records per partition, it takes approximately 150
> > seconds
> > > > to
> > > > > > >>> restore a single partition. With 18 partitions, it takes
> > roughly
> > > 45
> > > > > > >> minutes
> > > > > > >>> to fully restore the global store.
> > > > > > >>>
> > > > > > >>> Switching from HDDs to SSDs on the brokers' log directories
> > made
> > > > > > >>> restoration roughly 25% faster overall, but this still feels
> > > slow.
> > > > It
> > > > > > >> seems
> > > > > > >>> that I am hitting IOPS limits on the disks and am not even
> > close
> > > to
> > > > > > >> hitting
> > > > > > >>> the throughput limits of the disks on either the broker or
> > > streams
> > > > > > >>> application side.
> > > > > > >>>
> > > > > > >>> How can I minimize restoration time of a global store? Are
> > there
> > > > > > settings
> > > > > > >>> that can increase throughput with the same number of IOPS?
> > > Ideally
> > > > > > >>> restoration of each partition could be done in parallel but I
> > > > > recognize
> > > > > > >>> there is only a single global store thread. Bringing up a new
> > > > > instance
> > > > > > of
> > > > > > >>> the Kafka Streams application occurs on a potentially daily
> > > basis,
> > > > so
> > > > > > the
> > > > > > >>> restoration time is becoming more and more of a hassle.
> > > > > > >>>
> > > > > > >>> Thanks.
> > > > > > >>>
> > > > > > >>> Taylor
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang