You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Adam Binford (Jira)" <ji...@apache.org> on 2023/04/23 18:52:00 UTC

[jira] [Updated] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory

     [ https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Adam Binford updated SPARK-43244:
---------------------------------
    Description: 
We noticed in one of our production stateful streaming jobs using RocksDB that an executor with 20g of heap was using around 40g of resident memory. I noticed a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or so of this was from the write batch (which is now cleared after committing).

After reading about RocksDB memory usage (this link was helpful: [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) I realized a lot of this was likely the "Index and Filters" memory usage. This job is doing a streaming duplicate with a lot of unique values so it makes sense these block usages would be high. The problem is that, because as it is now the underlying RocksDB instance stays open on an executor as long as that executor is assigned that stateful partition (to be reused across batches). So a single executor can accumulate a large number of RocksDB instances open at once, each using a certain amount of native memory. In the worst case, a single executor could need to keep open every single partitions' RocksDB instance at once. 

There are a couple ways you can control the amount of memory used, such as limiting the max open files, or adding the option to use the block cache for the indices and filters, but neither of these solve the underlying problem of accumulating native memory from multiple partitions on an executor.

The real fix needs to be a mechanism and option to close the underlying RocksDB instance at the end of each task, so you have the option to only ever have one RocksDB instance open at a time, thus having predictable memory usage no matter the size of your data or number of shuffle partitions. 

We are running this on Spark 3.3, but just kicked off a test to see if things are any different in Spark 3.4.

  was:
We noticed in one of our production stateful streaming jobs using RocksDB that an executor with 20g of heap was using around 40g of resident memory. I noticed a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or so of this was from the write batch (which is now cleared after committing).

After reading about RocksDB memory usage ([this link was helpful]([https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) I realized a lot of this was likely the "Index and Filters" memory usage. This job is doing a streaming duplicate with a lot of unique values so it makes sense these block usages would be high. The problem is that, because as it is now the underlying RocksDB instance stays open on an executor as long as that executor is assigned that stateful partition (to be reused across batches). So a single executor can accumulate a large number of RocksDB instances open at once, each using a certain amount of native memory. In the worst case, a single executor could need to keep open every single partitions' RocksDB instance at once. 

There are a couple ways you can control the amount of memory used, such as limiting the max open files, or adding the option to use the block cache for the indices and filters, but neither of these solve the underlying problem of accumulating native memory from multiple partitions on an executor.

The real fix needs to be a mechanism and option to close the underlying RocksDB instance at the end of each task, so you have the option to only ever have one RocksDB instance open at a time, thus having predictable memory usage no matter the size of your data or number of shuffle partitions. 

We are running this on Spark 3.3, but just kicked off a test to see if things are any different in Spark 3.4.


> RocksDB State Store can accumulate unbounded native memory
> ----------------------------------------------------------
>
>                 Key: SPARK-43244
>                 URL: https://issues.apache.org/jira/browse/SPARK-43244
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>    Affects Versions: 3.3.2
>            Reporter: Adam Binford
>            Priority: Major
>
> We noticed in one of our production stateful streaming jobs using RocksDB that an executor with 20g of heap was using around 40g of resident memory. I noticed a single RocksDB instance was using around 150 MiB of memory, and only 5 MiB or so of this was from the write batch (which is now cleared after committing).
> After reading about RocksDB memory usage (this link was helpful: [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md]) I realized a lot of this was likely the "Index and Filters" memory usage. This job is doing a streaming duplicate with a lot of unique values so it makes sense these block usages would be high. The problem is that, because as it is now the underlying RocksDB instance stays open on an executor as long as that executor is assigned that stateful partition (to be reused across batches). So a single executor can accumulate a large number of RocksDB instances open at once, each using a certain amount of native memory. In the worst case, a single executor could need to keep open every single partitions' RocksDB instance at once. 
> There are a couple ways you can control the amount of memory used, such as limiting the max open files, or adding the option to use the block cache for the indices and filters, but neither of these solve the underlying problem of accumulating native memory from multiple partitions on an executor.
> The real fix needs to be a mechanism and option to close the underlying RocksDB instance at the end of each task, so you have the option to only ever have one RocksDB instance open at a time, thus having predictable memory usage no matter the size of your data or number of shuffle partitions. 
> We are running this on Spark 3.3, but just kicked off a test to see if things are any different in Spark 3.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org