You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Natalie Dunn <na...@ec.ai> on 2022/02/01 16:15:53 UTC

Memory issues with Rocksdb ColumnFamilyOptions

Hi All,

I am working on trying to process a Savepoint in order to produce basic statistics on it for monitoring. I’m running into an issue where processing a large Savepoint is running out of memory before I can process the Savepoint completely.

One thing I noticed in profiling the code is that there seems to be a lot of memory given to the  RocksDB ColumnFamilyOptions class because it is producing a lot of Java.lang.ref.Finalizer objects that don’t seem to be garbage collected.

I see in the RocksDB code that these should be closed but it doesn’t seem like they are being closed. https://github.com/facebook/rocksdb/blob/f57745814f2d9e937383b4bfea55373306182c14/java/src/main/java/org/rocksdb/AbstractNativeReference.java#L71

Is there a way to close these via the Flink API? Also, more generally, why am I seeing hundreds of thousands of these generated?

In case it’s helpful, here’s a genericized/simplified version of the code:


import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;

import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.Savepoint;

import org.apache.flink.state.api.functions.KeyedStateReaderFunction;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.operators.MapOperator;

import org.apache.flink.api.common.functions.RichReduceFunction;

Configuration config = new Configuration();
config.setInteger("state.backend.rocksdb.files.open", 20000);
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(config);
env.getConfig().enableObjectReuse();

EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend();

final EmbeddedRocksDBStateBackend configuredRocksDBStateBackend =
        stateBackend.configure(
                config, Thread.currentThread().getContextClassLoader());

// The below function just downloads the savepoint from our cloud storage and runs Savepoint.load()
ExistingSavepoint savepoint = loadSavepoint(env, configuredRocksDBStateBackend, savepointPath);


// ReFunctionStateReader() is a KeyedStateReaderFunction and does basic processing in readKey

DataSource<StateReport> source = savepoint.readKeyedState("process-1", new FunctionStateReader());

final MapOperator<StateReport, Metrics> sizes =
        source
                .map(s -> new Metrics(s.key, s.stateFields.values().stream().mapToInt(Integer::parseInt).sum(),
                        0, 0, 0, 0, 0, 0, 0))
                .returns(TypeInformation.of(new TypeHint<>() {
                }));

// MetricsRed() below is a RichReduceFunction
DataSet<Metrics> stats = sizes.reduce(new MetricsRed());


If you spot anything wrong with this approach that would cause memory issues, please let me know, I  am not 100% sure that the specific issue/question above is the full cause of the memory issues that I have been having.

Thank you!
Natalie

RE: Memory issues with Rocksdb ColumnFamilyOptions

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Natalie,

I happen to currently work on a similar problem: I’ve got a savepoint of about 40Gb just for one of the operator states, 70 Million keys.
With ExistingSavepoint there is currently a number of problems:

  *   When reading from a local copy of the savepoint, non-buffered I/O is used and that takes forever
     *   I created a façade buffered LocalFileSystem replacing the original (30 times faster) (PoC)
  *   Once loaded the keyed state and the timers into fresh RocksDB, it gets enumerated key by key, however
     *   MultiStateKeyIterator uses Streams, and unfortunately the implementation has to load the complete key set into a buffer before it even starts to return a single key
     *   I am also working on a (PoC) replacement for MultiStateKeyIterator that uses a traditional implementation without excessive buffering
     *   There are successful unit test that test the original implementation (RocksDBRocksStateKeysIteratorTest), but only with little data. Hence the problem does not surface in the test
  *   I plan to file a number of tickets on Jira once I’m confident I can come up with a good recommendation


Could the state size also be your problem? … What are your numbers?

I hope this helps or starts a discussion …

Sincere greetings

Thias



From: Natalie Dunn <na...@ec.ai>
Sent: Dienstag, 1. Februar 2022 17:16
To: user@flink.apache.org
Subject: Memory issues with Rocksdb ColumnFamilyOptions

Hi All,

I am working on trying to process a Savepoint in order to produce basic statistics on it for monitoring. I’m running into an issue where processing a large Savepoint is running out of memory before I can process the Savepoint completely.

One thing I noticed in profiling the code is that there seems to be a lot of memory given to the  RocksDB ColumnFamilyOptions class because it is producing a lot of Java.lang.ref.Finalizer objects that don’t seem to be garbage collected.

I see in the RocksDB code that these should be closed but it doesn’t seem like they are being closed. https://github.com/facebook/rocksdb/blob/f57745814f2d9e937383b4bfea55373306182c14/java/src/main/java/org/rocksdb/AbstractNativeReference.java#L71

Is there a way to close these via the Flink API? Also, more generally, why am I seeing hundreds of thousands of these generated?

In case it’s helpful, here’s a genericized/simplified version of the code:


import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;

import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.Savepoint;

import org.apache.flink.state.api.functions.KeyedStateReaderFunction;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.operators.MapOperator;

import org.apache.flink.api.common.functions.RichReduceFunction;

Configuration config = new Configuration();
config.setInteger("state.backend.rocksdb.files.open", 20000);
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(config);
env.getConfig().enableObjectReuse();

EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend();

final EmbeddedRocksDBStateBackend configuredRocksDBStateBackend =
        stateBackend.configure(
                config, Thread.currentThread().getContextClassLoader());
// The below function just downloads the savepoint from our cloud storage and runs Savepoint.load()
ExistingSavepoint savepoint = loadSavepoint(env, configuredRocksDBStateBackend, savepointPath);


// ReFunctionStateReader() is a KeyedStateReaderFunction and does basic processing in readKey

DataSource<StateReport> source = savepoint.readKeyedState("process-1", new FunctionStateReader());

final MapOperator<StateReport, Metrics> sizes =
        source
                .map(s -> new Metrics(s.key, s.stateFields.values().stream().mapToInt(Integer::parseInt).sum(),
                        0, 0, 0, 0, 0, 0, 0))
                .returns(TypeInformation.of(new TypeHint<>() {
                }));

// MetricsRed() below is a RichReduceFunction
DataSet<Metrics> stats = sizes.reduce(new MetricsRed());


If you spot anything wrong with this approach that would cause memory issues, please let me know, I  am not 100% sure that the specific issue/question above is the full cause of the memory issues that I have been having.

Thank you!
Natalie
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.