You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Navinder Brar (Jira)" <ji...@apache.org> on 2020/01/25 20:39:00 UTC

[jira] [Comment Edited] (KAFKA-9450) Decouple inner state flushing from committing with EOS

    [ https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17023643#comment-17023643 ] 

Navinder Brar edited comment on KAFKA-9450 at 1/25/20 8:38 PM:
---------------------------------------------------------------

Great that I caught hold of this Jira. I had been meaning to start some discussion around decoupling flush and committing. This ticket only deals with EOS, is it because the commit interval for non EOS is 30 seconds? Rocksdb flushing at 30 seconds also is a pretty big issue for us. I think the default "Level0FileNumCompactionTrigger" in Rocksdb is 4. Since the default max writers in rocksdbStore.java are 3, so 2 memtables get flushed at every 30 seconds. When someone has multiple stores in topology this means that there at least is one store that is undergoing compaction every 30 seconds(at most 1 minute). I was tracking CPU usage a few days ago while facing increased latencies in the 95th percentile every 30 seconds which was exactly overlapping with flush(which kind of leads to compaction) and there is huge bump in CPU usage every 30 seconds. So, to overcome for now I have increased the commit interval in our system to 30 minutes(and even in 30 minutes our memtables are not full) that also is inefficient. Let me know if we can discuss non EOS also here or should I create a separate ticket for it. 


was (Author: navibrar):
Great that I caught hold of this Jira. I had been meaning to start some discussion around decoupling flush and committing. This ticket only deals with EOS, is it because the commit interval for non EOS is 30 seconds? Rocksdb flushing at 30 seconds also is a pretty big issue for us. I think the default "Level0FileNumCompactionTrigger" in Rocksdb is 4. Since the default max writers in rocksdbStore.java are 3, so 2 memtables get flushed at every 30 seconds. When someone has multiple stores in topology this means that there at least is one store that is undergoing compaction every 30 seconds(at most 1 minute). I was tracking CPU usage a few days ago while facing increased latencies in the 99th percentile every 30 seconds which was exactly overlapping with flush(which kind of leads to compaction) and there is huge bump in CPU usage every 30 seconds. So, to overcome for now I have increased the commit interval in our system to 30 minutes(and even in 30 minutes our memtables are not full) that also is inefficient. Let me know if we can discuss non EOS also here or should I create a separate ticket for it. 

> Decouple inner state flushing from committing with EOS
> ------------------------------------------------------
>
>                 Key: KAFKA-9450
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9450
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Sophie Blee-Goldman
>            Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all the store layers are flushed during a commit. This is necessary for forwarding records in the cache to the changelog, but unfortunately also forces rocksdb to flush the current memtable before it's full. The result is a large number of small writes to disk, losing the benefits of batching, and a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an unclean shutdown with EOS, we may as well skip flushing the innermost StateStore during a commit and only do so during a graceful shutdown, before a rebalance, etc. This is currently blocked on a refactoring of the state store layers to allow decoupling the flush of the caching layer from the actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)