You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2016/12/02 23:13:57 UTC

Re: Initializing StateStores takes *really* long for large datasets

Before we have the a single-knob memory management feature, I'd like to
propose reducing the Streams' default config values for RocksDB caching and
memory block size. For example, I remember Henry has done some fine tuning
on the RocksDB config for his use case:

https://github.com/HenryCaiHaiying/kafka/commit/b297f7c585f5a883ee068277e5f0f1224c347bd4
https://github.com/HenryCaiHaiying/kafka/commit/eed1726d16e528d813755a6e66b49d0bf14e8803
https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576



We could check if some of those changes are appropriate in general and if
yes change the default settings accordingly.

Henry

On Wed, Nov 30, 2016 at 11:04 AM, Ara Ebrahimi <ar...@argyledata.com>
wrote:

> +1 on this.
>
> Ara.
>
> > On Nov 30, 2016, at 5:18 AM, Mathieu Fenniak <
> mathieu.fenniak@replicon.com> wrote:
> >
> > I'd like to quickly reinforce Frank's opinion regarding the rocksdb
> memory
> > usage.  I was also surprised by the amount of non-JVM-heap memory being
> > used and had to tune the 100 MB default down considerably.  It's also
> > unfortunate that it's hard to estimate the memory requirements for a KS
> app
> > because of this.  If you have ten stores, and assuming the default
> config,
> > you'd need a GB of memory for the rocksdb cache if you run 1 app, but
> only
> > half a GB if you run two app instances because the stores will be
> > distributed.
> >
> > It would be much nicer to be able to give KS a fixed amount of memory in
> a
> > config that it divided among the active stores on a node.  Configure it
> > with N GB; if a rebalance adds more tasks and stores, they each get less
> > RAM; if a rebalance removes tasks and stores, the remaining stores get
> more
> > RAM.  It seems like it'd be hard to do this with the RocksDBConfigSetter
> > interface because it doesn't get any state about the KS topology to make
> > decisions; which are arguably not config, but tuning / performance
> > decisions.
> >
> > Mathieu
> >
> >
> >
> > On Mon, Nov 28, 2016 at 3:45 PM, Frank Lyaruu <fl...@gmail.com> wrote:
> >
> >> I'll write an update on where I am now.
> >>
> >> I've got about 40 'primary' topics, some small, some up to about 10M
> >> messages,
> >> and about 30 internal topics, divided over 6 stream instances, all
> running
> >> in a single
> >> app, talking to a 3 node Kafka cluster.
> >>
> >> I use a single thread per stream instance, as my prime concern is now to
> >> get it
> >> to run stable, rather than optimizing performance.
> >>
> >> My biggest issue was that after a few hours my application started to
> slow
> >> down
> >> to ultimately freeze up or crash. It turned out that RocksDb consumed
> all
> >> my
> >> memory, which I overlooked as it was off-heap.
> >>
> >> I was fooling around with RocksDb settings a bit but I had missed the
> most
> >> important
> >> one:
> >>
> >>        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
> >>        tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
> >>        tableConfig.setBlockSize(BLOCK_SIZE);
> >>        options.setTableFormatConfig(tableConfig);
> >>
> >> The block cache size defaults to a whopping 100Mb per store, and that
> gets
> >> expensive
> >> fast. I reduced it to a few megabytes. My data size is so big that I
> doubt
> >> it is very effective
> >> anyway. Now it seems more stable.
> >>
> >> I'd say that a smaller default makes sense, especially because the
> failure
> >> case is
> >> so opaque (running all tests just fine but with a serious dataset it
> dies
> >> slowly)
> >>
> >> Another thing I see is that while starting all my instances, some are
> quick
> >> and some take
> >> time (makes sense as the data size varies greatly), but as more
> instances
> >> start up, they
> >> start to use more and more CPU I/O and network, that the initialization
> of
> >> the bigger ones
> >> takes even longer, increasing the chance that one of them takes longer
> than
> >> the
> >> MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks loose. Maybe we
> can
> >> separate the 'initialize' and 'start' step somehow.
> >>
> >> In this case we could log better: If initialization is taking longer
> than
> >> the timeout, it ends up
> >> being reassigned (in my case to the same instance) and then it errors
> out
> >> on being unable
> >> to lock the state dir. That message isn't too informative as the
> timeout is
> >> the actual problem.
> >>
> >> regards, Frank
> >>
> >>
> >> On Mon, Nov 28, 2016 at 8:01 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >>
> >>> Hello Frank,
> >>>
> >>> How many instances do you have in your apps and how many threads did
> you
> >>> use per thread? Note that besides the topology complexity (i.e. number
> of
> >>> state stores, number of internal topics etc) the (re-)initialization
> >>> process is depending on the underlying consumer's membership protocol,
> >> and
> >>> hence its rebalance latency could be longer with larger groups.
> >>>
> >>> We have been optimizing our rebalance latency due to state store
> >> migration
> >>> and restoration in the latest release, but for now the
> re-initialization
> >>> latency is still largely depends on 1) topology complexity regarding to
> >>> state stores and 2) number of input topic partitions and instance /
> >> threads
> >>> in the application.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Sat, Nov 26, 2016 at 12:57 AM, Damian Guy <da...@gmail.com>
> >> wrote:
> >>>
> >>>> Hi Frank,
> >>>>
> >>>> If you are running on a single node then the RocksDB state should be
> >>>> re-used by your app. However, it relies on the app being cleanly
> >> shutdown
> >>>> and the existence of ".checkpoint" files in the state directory for
> the
> >>>> store, .i.e, /tmp/kafka-streams/application-id/0_0/.checkpoint. If
> the
> >>>> file
> >>>> doesn't exist then the entire state will be restored from the
> >> changelog -
> >>>> which could take some time. I suspect this is what is happening?
> >>>>
> >>>> As for the RocksDB memory settings, yes the off heap memory usage does
> >>>> sneak under the radar. There is a memory management story for Kafka
> >>> Streams
> >>>> that is yet to be started. This would involve limiting the off-heap
> >>> memory
> >>>> that RocksDB uses.
> >>>>
> >>>> Thanks,
> >>>> Damian
> >>>>
> >>>> On Fri, 25 Nov 2016 at 21:14 Frank Lyaruu <fl...@gmail.com> wrote:
> >>>>
> >>>>> I'm running all on a single node, so there is no 'data mobility'
> >>>> involved.
> >>>>> So if Streams does not use any existing data, I might as well wipe
> >> the
> >>>>> whole RocksDb before starting, right?
> >>>>>
> >>>>> As for the RocksDb tuning, I am using a RocksDBConfigSetter, to
> >> reduce
> >>>> the
> >>>>> memory usage a bit:
> >>>>>
> >>>>> options.setWriteBufferSize(3000000);
> >>>>> options.setMaxBytesForLevelBase(30000000);
> >>>>> options.setMaxBytesForLevelMultiplier(3);
> >>>>>
> >>>>> I needed to do this as my 16Gb machine would die otherwise but I
> >>> honestly
> >>>>> was just reducing values more or less randomly until it wouldn't fall
> >>>> over.
> >>>>> I have to say this is a big drawback of Rocks, I monitor Java memory
> >>>> usage
> >>>>> but this just sneaks under the radar as it is off heap, and it isn't
> >>> very
> >>>>> clear what the implications are of different settings, as I can't
> >> says
> >>>>> something like the Xmx heap setting, meaning: Take whatever you need
> >> up
> >>>> to
> >>>>> this maximum. Also, if I get this right, in the long run, as the data
> >>> set
> >>>>> changes and grows, I can never be sure it won't take too much memory.
> >>>>>
> >>>>> I get the impression I'll be better off with an external store,
> >>>> something I
> >>>>> can monitor, tune and restart separately.
> >>>>>
> >>>>> But I'm getting ahead of myself. I'll wipe the data before I start,
> >> see
> >>>> if
> >>>>> that gets me any stability
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Fri, Nov 25, 2016 at 4:54 PM, Damian Guy <da...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Frank,
> >>>>>>
> >>>>>> If you have run the app before with the same applicationId,
> >>> completely
> >>>>> shut
> >>>>>> it down, and then restarted it again, it will need to restore all
> >> of
> >>>> the
> >>>>>> state which will take some time depending on the amount of data you
> >>>> have.
> >>>>>> In this case the placement of the partitions doesn't take into
> >>> account
> >>>>> any
> >>>>>> existing state stores, so it might need to load quite a lot of data
> >>> if
> >>>>>> nodes assigned certain partitions don't have that state-store (this
> >>> is
> >>>>>> something we should look at improving).
> >>>>>>
> >>>>>> As for RocksDB tuning - you can provide an implementation of
> >>>>>> RocksDBConfigSetter via config: StreamsConfig.ROCKSDB_CONFIG_
> >>>> SETTER_CLASS
> >>>>>> it has a single method:
> >>>>>>
> >>>>>> public void setConfig(final String storeName, final Options
> >> options,
> >>>>>> final Map<String, Object> configs)
> >>>>>>
> >>>>>> in this method you can set various options on the provided Options
> >>>>> object.
> >>>>>> The options that might help in this case are:
> >>>>>> options.setWriteBufferSize(..)  - default in streams is 32MB
> >>>>>> options.setMaxWriteBufferNumer(..) - default in streams is 3
> >>>>>>
> >>>>>> However, i'm no expert on RocksDB and i suggest you have look at
> >>>>>> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide for
> >>> more
> >>>>>> info.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Damian
> >>>>>>
> >>>>>> On Fri, 25 Nov 2016 at 13:02 Frank Lyaruu <fl...@gmail.com>
> >> wrote:
> >>>>>>
> >>>>>>> @Damian:
> >>>>>>>
> >>>>>>> Yes, it ran before, and it has that 200gb blob worth of Rocksdb
> >>> stuff
> >>>>>>>
> >>>>>>> @Svente: It's on a pretty high end san in a managed private
> >> cloud,
> >>>> I'm
> >>>>>>> unsure what the ultimate storage is, but I doubt there is a
> >>>> performance
> >>>>>>> problem there.
> >>>>>>>
> >>>>>>> On Fri, 25 Nov 2016 at 13:37, Svante Karlsson <
> >>>> svante.karlsson@csi.se>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> What kind of disk are you using for the rocksdb store? ie
> >>> spinning
> >>>> or
> >>>>>>> ssd?
> >>>>>>>>
> >>>>>>>> 2016-11-25 12:51 GMT+01:00 Damian Guy <da...@gmail.com>:
> >>>>>>>>
> >>>>>>>>> Hi Frank,
> >>>>>>>>>
> >>>>>>>>> Is this on a restart of the application?
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Damian
> >>>>>>>>>
> >>>>>>>>> On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu <flyaruu@gmail.com
> >>>
> >>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi y'all,
> >>>>>>>>>>
> >>>>>>>>>> I have a reasonably simple KafkaStream application, which
> >>>> merges
> >>>>>>> about
> >>>>>>>> 20
> >>>>>>>>>> topics a few times.
> >>>>>>>>>> The thing is, some of those topic datasets are pretty big,
> >>>> about
> >>>>>> 10M
> >>>>>>>>>> messages. In total I've got
> >>>>>>>>>> about 200Gb worth of state in RocksDB, the largest topic is
> >>> 38
> >>>>> Gb.
> >>>>>>>>>>
> >>>>>>>>>> I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to
> >>> cover
> >>>>> the
> >>>>>>>>>> initialization time,
> >>>>>>>>>> but that does not seem nearly enough, I'm looking at more
> >>> than
> >>>>> two
> >>>>>>> hour
> >>>>>>>>>> startup times, and
> >>>>>>>>>> that starts to be a bit ridiculous.
> >>>>>>>>>>
> >>>>>>>>>> Any tips / experiences on how to deal with this case? Move
> >>> away
> >>>>>> from
> >>>>>>>>> Rocks
> >>>>>>>>>> and use an external
> >>>>>>>>>> data store? Any tuning tips on how to tune Rocks to be a
> >> bit
> >>>> more
> >>>>>>>> useful
> >>>>>>>>>> here?
> >>>>>>>>>>
> >>>>>>>>>> regards, Frank
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
> >
> > ________________________________
>
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> ________________________________
>



-- 
-- Guozhang