You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jon Buffington (JIRA)" <ji...@apache.org> on 2017/04/26 09:43:04 UTC

[jira] [Commented] (KAFKA-5122) Kafka Streams unexpected off-heap memory growth

    [ https://issues.apache.org/jira/browse/KAFKA-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15984467#comment-15984467 ] 

Jon Buffington commented on KAFKA-5122:
---------------------------------------

There was not a leak; I was miscalculating expected memory usage.

My suggestions is to expand documentation on estimating RocksDB off-heap memory usage for stores in the capacity planning guide. In my case, the guidance, "The default parameters may mean that each state store consumes anywhere between 50-100 MB of memory." did not apply. My suggestion would be to provide a formula along the lines of:

{noformat}
write_buffer_size_mb = 32  # 0.10.2 default
write_buffer_count = 3 # 0.10.2 default
block_cache_size_mb = 16 # 0.10.2 default
estimate_per_segment = (write_buffer_size_mb * write_buffer_count) + block_cache_size_mb
estimate_per_segment = 196m
{noformat}

Then, estimating application RockDB memory requirements:
{noformat}
store_partitions = 40  # in my application
segments = 3 # the default for windowed stores in 0.10.2
estimate = estimate_per_segment * partitions * segments
estimate = 23520m
{noformat}

References:
* [https://github.com/facebook/rocksdb/issues/706]
* [https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB]

Based on my application's requirements and the above calculations, I ended up creating a RocksDBConfigSetter class:
{code}
class AppRocksDBConfigSetter extends RocksDBConfigSetter {
  override def setConfig(storeName: String, options: Options, configs: JMap[String, AnyRef]): Unit = {
    val n = Runtime.getRuntime.availableProcessors
    // Improve write throughput by increasing compaction throughput.
    options.setMaxBackgroundCompactions(n)

    val tableConfig = new org.rocksdb.BlockBasedTableConfig()
    tableConfig.setBlockCacheSize(16 * 1024 * 1024L)    // Reduce block cache size from <https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L75>
                                                        // as total number of store RocksDB databases is partitions (40) * segments (3) = 120.
    tableConfig.setBlockSize(16 * 1024L)                // Modify default <https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L76>
                                                        // per <https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks>.
    tableConfig.setCacheIndexAndFilterBlocks(true)      // Do not let index and filter blocks grow unbounded. See <https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks>
    options.setTableFormatConfig(tableConfig)

    options.setMaxWriteBufferNumber(2)                  // See <https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103>

    // Default Kafka streams 0.10.2 write_buffer_size is 32m per <https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L74>.
    // Per RocksDB instance, the memory usage estimate is (32m * 2) + 16m = 80m
    // The page-view application estimated RocksDV off-heap memory is 9600m
  }
}
{code}

> Kafka Streams unexpected off-heap memory growth
> -----------------------------------------------
>
>                 Key: KAFKA-5122
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5122
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>         Environment: Linux 64-bit
> Oracle JVM version "1.8.0_121"
>            Reporter: Jon Buffington
>            Priority: Minor
>
> I have a Kafka Streams application that leaks off-heap memory at a rate of 20MB per commit interval. The application is configured with a 1G heap; the heap memory does not show signs of leaking. The application reaches 16g of system memory usage before terminating and restarting.
> Application facts:
> * The data pipeline is source -> map -> groupByKey -> reduce -> to.
> * The reduce operation uses a tumbling time window TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.HOURS.toMillis(168)).
> * The commit interval is five minutes (300000ms).
> * The application links to v0.10.2.0-cp1 of the Kakfa libraries. When I link to the current 0.10.2.1 RC3, the leak rate changes to ~10MB per commit interval.
> * The application uses the schema registry for two pairs of serdes. One serde pair is used to read from a source topic that has 40 partitions. The other serde pair is used by the internal changelog and repartition topics created by the groupByKey/reduce operations.
> * The source input rate varies between 500-1500 records/sec. The source rate variation does not change the size or frequency of the leak.
> * The application heap has been configured using both 1024m and 2048m. The only observed difference between the two JVM heap sizes is more old gen collections at 1024m although there is little difference in throughput. JVM settings are {-server -Djava.awt.headless=true -Xss256k -XX:MaxMetaspaceSize=128m -XX:ReservedCodeCacheSize=64m -XX:CompressedClassSpaceSize=32m -XX:MaxDirectMemorySize=128m -XX:+AlwaysPreTouch -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:InitiatingHeapOccupancyPercent=35 -XX:+PerfDisableSharedMem -XX:+UseStringDeduplication -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80}
> * We configure a custom RocksDBConfigSetter to set options.setMaxBackgroundCompactions(Runtime.getRuntime.availableProcessors)
> * Per <http://mail-archives.apache.org/mod_mbox/kafka-users/201702.mbox/%3cCAHwHRrXxPwgYVr1CTWgoudKr7cqkaQ+52pHfpUZS4J-wv7K97w@mail.gmail.com%3e>, the SSTables are being compacted. Total disk usage for the state files (RocksDB) is ~2.5g. Per partition and window, there are 3-4 SSTables.
> * The application is written in Scala and compiled using version 2.12.1.
> • Oracle JVM version "1.8.0_121"
> Various experiments that had no effect on the leak rate:
> * Tried different RocksDB block sizes (4k, 16k, and 32k).
> * Different numbers of instances (1, 2, and 4).
> * Different numbers of threads (1, 4, 10, 40).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)