You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kevin Lam <ke...@shopify.com> on 2021/09/30 15:37:30 UTC

RocksDB: Spike in Memory Usage Post Restart

Hi all,

We're debugging an issue with OOMs that occurs on our jobs shortly after a
restore from checkpoint. Our application is running on kubernetes and uses
RocksDB as it's state backend.

We reproduced the issue on a small cluster of 2 task managers. If we killed
a single task manager, we noticed that after restoring from checkpoint, the
untouched task manager has an elevated memory footprint (see the blue line
for the surviving task manager):

[image: image.png]
If we kill the newest TM (yellow line) again, after restoring the surviving
task manager gets OOM killed.

We looked at the OOMKiller Report and it seems that the memory is not
coming from the JVM but we're unsure of the source. It seems like something
is allocating native memory that the JVM is not aware of.

We're suspicious of RocksDB. Has anyone seen this kind of issue before? Is
it possible there's some kind of memory pressure or memory leak coming from
RocksDB that only presents itself when a job is restarted? Perhaps
something isn't cleaned up?

Any help would be appreciated.

Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Yun Tang <my...@live.com>.
Hi Yaroslav,

I don't think disable block cache is the correct way to manage the memory usage of RocksDB. Moreover, it cannot actually limit the memory usage indeed.

RocksDB actually have two parts of memory usage per column family: the part for writing, which is used by write buffers and the part for reading, which is used by reading index/filter/data blocks.
Apart from the memory model of RocksDB itself, Flink actually does not limit the number of states within one operator (which means does not limit the number of column families in one RocksDB instance) and also enable one slot to contain different operators, which means does not limit the number of RocksDB instances in one slot. Due to these reasons, disabling managed memory and block cache is actually not the correct way. Maybe you did not meet the OOM problem in some jobs but cannot ensure all jobs could behave well.

I think we should still focus on finding who actually takes too much additional memory. First of all, you should give some additional space for taskmanager.memory.jvm-overhead.max and taskmanager.memory.jvm-overhead.min. And use re-built jemalloc [1] to pass to container's running environment. This debug phase is common for native development and Apache Doris also have guide for this but using tcmalloc [3]. We could analysis the prof call stack to figure out what native memory is used.

[1] https://technology.blog.gov.uk/2015/12/11/using-jemalloc-to-get-to-the-bottom-of-a-memory-leak/
[2] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#forwarding-environment-variables
[3] https://doris.apache.org/master/en/developer-guide/debug-tool.html#memory

Best
Yun Tang

________________________________
From: Yaroslav Tkachenko <ya...@shopify.com>
Sent: Monday, October 11, 2021 3:04
To: Yun Tang <my...@live.com>
Cc: Ammon Diether <ad...@gmail.com>; Kevin Lam <ke...@shopify.com>; Fabian Paul <fa...@ververica.com>; user <us...@flink.apache.org>
Subject: Re: RocksDB: Spike in Memory Usage Post Restart

A quick update on this, we were able to fix the memory leak by disabling block cache in RocksDB with:

state.backend.rocksdb.options-factory: xxx.NoBlockCacheRocksDbOptionsFactory
state.backend.rocksdb.memory.managed: false

Where NoBlockCacheRocksDbOptionsFactory essentially does:

val blockBasedTableConfig = new BlockBasedTableConfig()
blockBasedTableConfig.setNoBlockCache(true)
// Needed in order to disable block cache
blockBasedTableConfig.setCacheIndexAndFilterBlocks(false)
blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(false)
blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(false)

We did NOT see big performance degradation when running on SSDs.

On Fri, Oct 8, 2021 at 3:41 AM Yun Tang <my...@live.com>> wrote:
Hi Kevin,

Sorry for late jumping in as we were in a vocation holiday.

Since you already refer the doc https://erikwramner.files.wordpress.com/2017/10/native-memory-leaks-in-java.pdf, have you ever figured out the native call via jemalloc and jeprof?

From my experience, there could be two general kinds of native memory leak (we do not consider the stack or static memory as they should not consume much in general cases, and the case of mmap memory usage):

  1.  Malloced memory does not free in time or forget to free. This could be figured out via tool jemalloc or tcmalloc by passing to Flink's container environment [1] to set related flag. You could increase taskmanager.memory.jvm-overhead.max [2] and taskmanager.memory.jvm-overhead.min[3] to leave enough space to figure out what occupied the memory. We have observed that unzipping configuration files too frequently could also consume too much native memory.
  2.  Even the native program has freed the memory but the underlying memory allocator did not return memory to OS in time. The default allocator in glibc does not behave well compared with jemalloc and tcmalloc in this area, that's why we try to change default memory allocator to jemalloc [4]. You could use 'pmap' or cat cgroup info to see whether the running process has included jemalloc.so to see whether the memory allocator works as expected.

[1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#forwarding-environment-variables
[2] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-jvm-overhead-max
[3] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-jvm-overhead-min
[4] https://issues.apache.org/jira/browse/FLINK-19125

Best
Yun Tang






________________________________
From: Ammon Diether <ad...@gmail.com>>
Sent: Thursday, October 7, 2021 12:39
To: Kevin Lam <ke...@shopify.com>>
Cc: Fabian Paul <fa...@ververica.com>>; user <us...@flink.apache.org>>
Subject: Re: RocksDB: Spike in Memory Usage Post Restart

I don't mean to derail or take away from this thread only to second that I am seeing the same behavior.  We are using Flink Stateful Functions 3.0.0  Flink 12.1 in  K8 environment.

In the graph a little after 15:00 a few of the taskmanagers (5 of 64) were moved to a different node and restarted (Kubernetes Scale down was the reason).  The new taskmangers spun up, but the long running taskmanager's memory unexpectedly goes up.
[image.png]

On Wed, Oct 6, 2021 at 9:52 AM Kevin Lam <ke...@shopify.com>> wrote:
Hi Fabian,

Yes I can tell you a bit more about the job we are seeing the problem with. I'll simplify things a bit but this captures the essence:

1. Input datastreams are from a few kafka sources that we intend to join.
2. We wrap the datastreams we want to join into a common container class and key them on the join key.
3. Union and process the datastreams with a KeyedProcessFunction which holds the latest value seen for each source in ValueStates, and emits an output that is the function of the stored ValueStates each time a new value comes in.
4. We have to support arbitrarily late arriving data, so we don't window, and just keep everything in ValueState.
5. The state we want to support is very large, on the order of several TBs.

On Wed, Oct 6, 2021 at 10:50 AM Fabian Paul <fa...@ververica.com>> wrote:
Hi Kevin,

Since you are seeing the problem across multiple Flink versions and with the default RocksDb and custom configuration it might be related
 to something else. A lot of different components can allocate direct memory i.e. some filesystem implementations, the connectors or some user grpc dependency.


Can you tell use a bit more about the job you are seeing the problem with?

Best,
Fabian


Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Yaroslav Tkachenko <ya...@shopify.com>.
A quick update on this, we were able to fix the memory leak by disabling
block cache in RocksDB with:

state.backend.rocksdb.options-factory: xxx.NoBlockCacheRocksDbOptionsFactory
state.backend.rocksdb.memory.managed: false

Where NoBlockCacheRocksDbOptionsFactory essentially does:

val blockBasedTableConfig = new BlockBasedTableConfig()
blockBasedTableConfig.setNoBlockCache(true)
// Needed in order to disable block cache
blockBasedTableConfig.setCacheIndexAndFilterBlocks(false)
blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(false)
blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(false)

We did NOT see big performance degradation when running on SSDs.

On Fri, Oct 8, 2021 at 3:41 AM Yun Tang <my...@live.com> wrote:

> Hi Kevin,
>
> Sorry for late jumping in as we were in a vocation holiday.
>
> Since you already refer the doc
> https://erikwramner.files.wordpress.com/2017/10/native-memory-leaks-in-java.pdf,
> have you ever figured out the native call via jemalloc and jeprof?
>
> From my experience, there could be two general kinds of native memory leak
> (we do not consider the stack or static memory as they should not consume
> much in general cases, and the case of mmap memory usage):
>
>    1. Malloced memory does not free in time or forget to free. This could
>    be figured out via tool jemalloc or tcmalloc by passing to Flink's
>    container environment [1] to set related flag. You could
>    increase taskmanager.memory.jvm-overhead.max [2]
>    and taskmanager.memory.jvm-overhead.min[3] to leave enough space to figure
>    out what occupied the memory. We have observed that unzipping configuration
>    files too frequently could also consume too much native memory.
>    2. Even the native program has freed the memory but the underlying
>    memory allocator did not return memory to OS in time. The default allocator
>    in glibc does not behave well compared with jemalloc and tcmalloc in this
>    area, that's why we try to change default memory allocator to jemalloc [4].
>    You could use 'pmap' or cat cgroup info to see whether the running process
>    has included jemalloc.so to see whether the memory allocator works as
>    expected.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#forwarding-environment-variables
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-jvm-overhead-max
> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-jvm-overhead-min
> [4] https://issues.apache.org/jira/browse/FLINK-19125
>
> Best
> Yun Tang
>
>
>
>
>
>
> ------------------------------
> *From:* Ammon Diether <ad...@gmail.com>
> *Sent:* Thursday, October 7, 2021 12:39
> *To:* Kevin Lam <ke...@shopify.com>
> *Cc:* Fabian Paul <fa...@ververica.com>; user <us...@flink.apache.org>
> *Subject:* Re: RocksDB: Spike in Memory Usage Post Restart
>
> I don't mean to derail or take away from this thread only to second that I
> am seeing the same behavior.  We are using Flink Stateful Functions 3.0.0
> Flink 12.1 in  K8 environment.
>
> In the graph a little after 15:00 a few of the taskmanagers (5 of 64) were
> moved to a different node and restarted (Kubernetes Scale down was the
> reason).  The new taskmangers spun up, but the long running
> taskmanager's memory unexpectedly goes up.
> [image: image.png]
>
> On Wed, Oct 6, 2021 at 9:52 AM Kevin Lam <ke...@shopify.com> wrote:
>
> Hi Fabian,
>
> Yes I can tell you a bit more about the job we are seeing the problem
> with. I'll simplify things a bit but this captures the essence:
>
> 1. Input datastreams are from a few kafka sources that we intend to join.
> 2. We wrap the datastreams we want to join into a common container class
> and key them on the join key.
> 3. Union and process the datastreams with a KeyedProcessFunction which
> holds the latest value seen for each source in ValueStates, and emits an
> output that is the function of the stored ValueStates each time a new value
> comes in.
> 4. We have to support arbitrarily late arriving data, so we don't window,
> and just keep everything in ValueState.
> 5. The state we want to support is very large, on the order of several
> TBs.
>
> On Wed, Oct 6, 2021 at 10:50 AM Fabian Paul <fa...@ververica.com>
> wrote:
>
> Hi Kevin,
>
> Since you are seeing the problem across multiple Flink versions and with
> the default RocksDb and custom configuration it might be related
>  to something else. A lot of different components can allocate direct
> memory i.e. some filesystem implementations, the connectors or some user
> grpc dependency.
>
>
> Can you tell use a bit more about the job you are seeing the problem with?
>
> Best,
> Fabian
>
>

Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Yun Tang <my...@live.com>.
Hi Kevin,

Sorry for late jumping in as we were in a vocation holiday.

Since you already refer the doc https://erikwramner.files.wordpress.com/2017/10/native-memory-leaks-in-java.pdf, have you ever figured out the native call via jemalloc and jeprof?

From my experience, there could be two general kinds of native memory leak (we do not consider the stack or static memory as they should not consume much in general cases, and the case of mmap memory usage):

  1.  Malloced memory does not free in time or forget to free. This could be figured out via tool jemalloc or tcmalloc by passing to Flink's container environment [1] to set related flag. You could increase taskmanager.memory.jvm-overhead.max [2] and taskmanager.memory.jvm-overhead.min[3] to leave enough space to figure out what occupied the memory. We have observed that unzipping configuration files too frequently could also consume too much native memory.
  2.  Even the native program has freed the memory but the underlying memory allocator did not return memory to OS in time. The default allocator in glibc does not behave well compared with jemalloc and tcmalloc in this area, that's why we try to change default memory allocator to jemalloc [4]. You could use 'pmap' or cat cgroup info to see whether the running process has included jemalloc.so to see whether the memory allocator works as expected.

[1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#forwarding-environment-variables
[2] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-jvm-overhead-max
[3] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-jvm-overhead-min
[4] https://issues.apache.org/jira/browse/FLINK-19125

Best
Yun Tang






________________________________
From: Ammon Diether <ad...@gmail.com>
Sent: Thursday, October 7, 2021 12:39
To: Kevin Lam <ke...@shopify.com>
Cc: Fabian Paul <fa...@ververica.com>; user <us...@flink.apache.org>
Subject: Re: RocksDB: Spike in Memory Usage Post Restart

I don't mean to derail or take away from this thread only to second that I am seeing the same behavior.  We are using Flink Stateful Functions 3.0.0  Flink 12.1 in  K8 environment.

In the graph a little after 15:00 a few of the taskmanagers (5 of 64) were moved to a different node and restarted (Kubernetes Scale down was the reason).  The new taskmangers spun up, but the long running taskmanager's memory unexpectedly goes up.
[image.png]

On Wed, Oct 6, 2021 at 9:52 AM Kevin Lam <ke...@shopify.com>> wrote:
Hi Fabian,

Yes I can tell you a bit more about the job we are seeing the problem with. I'll simplify things a bit but this captures the essence:

1. Input datastreams are from a few kafka sources that we intend to join.
2. We wrap the datastreams we want to join into a common container class and key them on the join key.
3. Union and process the datastreams with a KeyedProcessFunction which holds the latest value seen for each source in ValueStates, and emits an output that is the function of the stored ValueStates each time a new value comes in.
4. We have to support arbitrarily late arriving data, so we don't window, and just keep everything in ValueState.
5. The state we want to support is very large, on the order of several TBs.

On Wed, Oct 6, 2021 at 10:50 AM Fabian Paul <fa...@ververica.com>> wrote:
Hi Kevin,

Since you are seeing the problem across multiple Flink versions and with the default RocksDb and custom configuration it might be related
 to something else. A lot of different components can allocate direct memory i.e. some filesystem implementations, the connectors or some user grpc dependency.


Can you tell use a bit more about the job you are seeing the problem with?

Best,
Fabian


Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Yaroslav Tkachenko <ya...@shopify.com>.
And we also found this amazing issue
https://github.com/facebook/rocksdb/issues/4112 that makes me wonder why we
don't see more complaints :)

A similar issue that was closed, but not resolved
https://issues.apache.org/jira/browse/FLINK-21986

On Wed, Oct 6, 2021 at 9:39 PM Ammon Diether <ad...@gmail.com> wrote:

> I don't mean to derail or take away from this thread only to second that I
> am seeing the same behavior.  We are using Flink Stateful Functions 3.0.0
> Flink 12.1 in  K8 environment.
>
> In the graph a little after 15:00 a few of the taskmanagers (5 of 64) were
> moved to a different node and restarted (Kubernetes Scale down was the
> reason).  The new taskmangers spun up, but the long running
> taskmanager's memory unexpectedly goes up.
> [image: image.png]
>
> On Wed, Oct 6, 2021 at 9:52 AM Kevin Lam <ke...@shopify.com> wrote:
>
>> Hi Fabian,
>>
>> Yes I can tell you a bit more about the job we are seeing the problem
>> with. I'll simplify things a bit but this captures the essence:
>>
>> 1. Input datastreams are from a few kafka sources that we intend to join.
>> 2. We wrap the datastreams we want to join into a common container class
>> and key them on the join key.
>> 3. Union and process the datastreams with a KeyedProcessFunction which
>> holds the latest value seen for each source in ValueStates, and emits an
>> output that is the function of the stored ValueStates each time a new value
>> comes in.
>> 4. We have to support arbitrarily late arriving data, so we don't window,
>> and just keep everything in ValueState.
>> 5. The state we want to support is very large, on the order of several
>> TBs.
>>
>> On Wed, Oct 6, 2021 at 10:50 AM Fabian Paul <fa...@ververica.com>
>> wrote:
>>
>>> Hi Kevin,
>>>
>>> Since you are seeing the problem across multiple Flink versions and with
>>> the default RocksDb and custom configuration it might be related
>>>  to something else. A lot of different components can allocate direct
>>> memory i.e. some filesystem implementations, the connectors or some user
>>> grpc dependency.
>>>
>>>
>>> Can you tell use a bit more about the job you are seeing the problem
>>> with?
>>>
>>> Best,
>>> Fabian
>>>
>>>

Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Ammon Diether <ad...@gmail.com>.
I don't mean to derail or take away from this thread only to second that I
am seeing the same behavior.  We are using Flink Stateful Functions 3.0.0
Flink 12.1 in  K8 environment.

In the graph a little after 15:00 a few of the taskmanagers (5 of 64) were
moved to a different node and restarted (Kubernetes Scale down was the
reason).  The new taskmangers spun up, but the long running
taskmanager's memory unexpectedly goes up.
[image: image.png]

On Wed, Oct 6, 2021 at 9:52 AM Kevin Lam <ke...@shopify.com> wrote:

> Hi Fabian,
>
> Yes I can tell you a bit more about the job we are seeing the problem
> with. I'll simplify things a bit but this captures the essence:
>
> 1. Input datastreams are from a few kafka sources that we intend to join.
> 2. We wrap the datastreams we want to join into a common container class
> and key them on the join key.
> 3. Union and process the datastreams with a KeyedProcessFunction which
> holds the latest value seen for each source in ValueStates, and emits an
> output that is the function of the stored ValueStates each time a new value
> comes in.
> 4. We have to support arbitrarily late arriving data, so we don't window,
> and just keep everything in ValueState.
> 5. The state we want to support is very large, on the order of several
> TBs.
>
> On Wed, Oct 6, 2021 at 10:50 AM Fabian Paul <fa...@ververica.com>
> wrote:
>
>> Hi Kevin,
>>
>> Since you are seeing the problem across multiple Flink versions and with
>> the default RocksDb and custom configuration it might be related
>>  to something else. A lot of different components can allocate direct
>> memory i.e. some filesystem implementations, the connectors or some user
>> grpc dependency.
>>
>>
>> Can you tell use a bit more about the job you are seeing the problem with?
>>
>> Best,
>> Fabian
>>
>>

Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Kevin Lam <ke...@shopify.com>.
Hi Fabian,

Yes I can tell you a bit more about the job we are seeing the problem with.
I'll simplify things a bit but this captures the essence:

1. Input datastreams are from a few kafka sources that we intend to join.
2. We wrap the datastreams we want to join into a common container class
and key them on the join key.
3. Union and process the datastreams with a KeyedProcessFunction which
holds the latest value seen for each source in ValueStates, and emits an
output that is the function of the stored ValueStates each time a new value
comes in.
4. We have to support arbitrarily late arriving data, so we don't window,
and just keep everything in ValueState.
5. The state we want to support is very large, on the order of several TBs.

On Wed, Oct 6, 2021 at 10:50 AM Fabian Paul <fa...@ververica.com>
wrote:

> Hi Kevin,
>
> Since you are seeing the problem across multiple Flink versions and with
> the default RocksDb and custom configuration it might be related
>  to something else. A lot of different components can allocate direct
> memory i.e. some filesystem implementations, the connectors or some user
> grpc dependency.
>
>
> Can you tell use a bit more about the job you are seeing the problem with?
>
> Best,
> Fabian
>
>

Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Fabian Paul <fa...@ververica.com>.
Hi Kevin,

Since you are seeing the problem across multiple Flink versions and with the default RocksDb and custom configuration it might be related
 to something else. A lot of different components can allocate direct memory i.e. some filesystem implementations, the connectors or some user grpc dependency.


Can you tell use a bit more about the job you are seeing the problem with?

Best,
Fabian


Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Kevin Lam <ke...@shopify.com>.
Hi Fabian,

Thanks for collecting feedback. Here's the answers to your questions:

1. Yes, we enabled incremental checkpoints for our job by setting
`state.backend.incremental` to true. As for whether the checkpoint we
recover from is incremental or not, I'm not sure how to determine that.
It's whatever Flink does by default with incremental checkpoints enabled.

2. Yes this was on purpose, we had tuned our job to work well on SSDs. We
have also run jobs with those parameters unset and using defaults, and
still have the same OOM issues.

Thanks for the pointer, yes we've been looking at the RocksDB metrics. They
haven't indicated to us what the issue is yet.

On Wed, Oct 6, 2021 at 3:21 AM Fabian Paul <fa...@ververica.com> wrote:

> Hi Kevin,
>
> Sorry for the late reply. I collected some feedback from other folks and
> have two more questions.
>
> 1. Did you enable incremental checkpoints for your job and is the
> checkpoint you recover from incremental?
>
> 2. I saw in your configuration that you set
> `state.backend.rocksdb.block.cache-size` and
> `state.backend.rocksdb.predefined.options` by doing
>  so you overwrite the values Flink automatically sets. Can you confirm
> that this is on purpose? The value for block.cache-size seems to be very
> small.
>
> You can also enable the native RocksDb metrics [1] to get a more detail
> view of the RocksDb memory consumption but be carefully because it may
> degrade the performance of your job.
>
> Best,
> Fabian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics
>
>
>

Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Fabian Paul <fa...@ververica.com>.
Hi Kevin,

Sorry for the late reply. I collected some feedback from other folks and have two more questions.

1. Did you enable incremental checkpoints for your job and is the checkpoint you recover from incremental? 

2. I saw in your configuration that you set `state.backend.rocksdb.block.cache-size` and `state.backend.rocksdb.predefined.options` by doing 
 so you overwrite the values Flink automatically sets. Can you confirm that this is on purpose? The value for block.cache-size seems to be very small.

You can also enable the native RocksDb metrics [1] to get a more detail view of the RocksDb memory consumption but be carefully because it may degrade the performance of your job.

Best,
Fabian

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics



Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Kevin Lam <ke...@shopify.com>.
i was reading a bit about RocksDb and it seems the Java version is somewhat
particular about how it should be cleaned up to ensure all resources are
cleaned up:

<https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management>
ttps://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management
<https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management>

   - "Many of the Java Objects used in the RocksJava API will be backed by
   C++ objects for which the Java Objects have ownership. As C++ has no notion
   of automatic garbage collection for its heap in the way that Java does, we
   must explicitly free the memory used by the C++ objects when we are
   finished with them."

Column families also have a specific close procedure

<https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families>
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families

   - "It is important to note that when working with Column Families in
   RocksJava, there is a very specific order of destruction that must be
   obeyed for the database to correctly free all resources and shutdown."

When a running job fails and a running TaskManager restores from
checkpoint, is the old Embedded RocksDb being cleaned up properly? I wasn't
really sure where to look in the Flink source code to verify this.

On Mon, Oct 4, 2021 at 4:56 PM Kevin Lam <ke...@shopify.com> wrote:

> We tried with 1.14.0, unfortunately we still run into the issue. Any
> thoughts or suggestions?
>
> On Mon, Oct 4, 2021 at 9:09 AM Kevin Lam <ke...@shopify.com> wrote:
>
>> Hi Fabian,
>>
>> We're using our own image built from the official Flink docker image, so
>> we should have the code to use jemalloc in the docker entrypoint.
>>
>> I'm going to give 1.14 a try and will let you know how it goes.
>>
>> On Mon, Oct 4, 2021 at 8:29 AM Fabian Paul <fa...@ververica.com>
>> wrote:
>>
>>> Hi Kevin,
>>>
>>> We bumped the RocksDb version with Flink 1.14 which we thought increases
>>> the memory control [1]. In the past we also saw problems with the allocator
>>> used of the OS. We switched to use jemalloc within our docker images which
>>> has a better memory fragmentation [2]. Are you using the official Flink
>>> docker image or did you build your own?
>>>
>>> I am also pulling in yun tang who is more familiar with Flink’s state
>>> backend. Maybe he has an immediate idea about your problem.
>>>
>>> Best,
>>> Fabian
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-14482
>>> [2]
>>> https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99%40%3Cdev.flink.apache.org%3E
>>> <https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99@%3Cdev.flink.apache.org%3E>
>>>
>>>
>>>

Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Kevin Lam <ke...@shopify.com>.
We tried with 1.14.0, unfortunately we still run into the issue. Any
thoughts or suggestions?

On Mon, Oct 4, 2021 at 9:09 AM Kevin Lam <ke...@shopify.com> wrote:

> Hi Fabian,
>
> We're using our own image built from the official Flink docker image, so
> we should have the code to use jemalloc in the docker entrypoint.
>
> I'm going to give 1.14 a try and will let you know how it goes.
>
> On Mon, Oct 4, 2021 at 8:29 AM Fabian Paul <fa...@ververica.com>
> wrote:
>
>> Hi Kevin,
>>
>> We bumped the RocksDb version with Flink 1.14 which we thought increases
>> the memory control [1]. In the past we also saw problems with the allocator
>> used of the OS. We switched to use jemalloc within our docker images which
>> has a better memory fragmentation [2]. Are you using the official Flink
>> docker image or did you build your own?
>>
>> I am also pulling in yun tang who is more familiar with Flink’s state
>> backend. Maybe he has an immediate idea about your problem.
>>
>> Best,
>> Fabian
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-14482
>> [2]
>> https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99%40%3Cdev.flink.apache.org%3E
>> <https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99@%3Cdev.flink.apache.org%3E>
>>
>>
>>

Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Kevin Lam <ke...@shopify.com>.
Hi Fabian,

We're using our own image built from the official Flink docker image, so we
should have the code to use jemalloc in the docker entrypoint.

I'm going to give 1.14 a try and will let you know how it goes.

On Mon, Oct 4, 2021 at 8:29 AM Fabian Paul <fa...@ververica.com> wrote:

> Hi Kevin,
>
> We bumped the RocksDb version with Flink 1.14 which we thought increases
> the memory control [1]. In the past we also saw problems with the allocator
> used of the OS. We switched to use jemalloc within our docker images which
> has a better memory fragmentation [2]. Are you using the official Flink
> docker image or did you build your own?
>
> I am also pulling in yun tang who is more familiar with Flink’s state
> backend. Maybe he has an immediate idea about your problem.
>
> Best,
> Fabian
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-14482
> [2]
> https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99%40%3Cdev.flink.apache.org%3E
> <https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99@%3Cdev.flink.apache.org%3E>
>
>
>

Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Fabian Paul <fa...@ververica.com>.
Hi Kevin,

We bumped the RocksDb version with Flink 1.14 which we thought increases the memory control [1]. In the past we also saw problems with the allocator used of the OS. We switched to use jemalloc within our docker images which has a better memory fragmentation [2]. Are you using the official Flink docker image or did you build your own? 

I am also pulling in yun tang who is more familiar with Flink’s state backend. Maybe he has an immediate idea about your problem.

Best,
Fabian


[1] https://issues.apache.org/jira/browse/FLINK-14482 <https://issues.apache.org/jira/browse/FLINK-14482>
[2] https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99%40%3Cdev.flink.apache.org%3E <https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99@%3Cdev.flink.apache.org%3E>



Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Kevin Lam <ke...@shopify.com>.
Hi Fabian,

Thanks for your response.

Sure, let me tell you a bit more about the job.

   - Flink version 1.13.1 (I also tried 1.13.2 because I saw FLINK-22886
   <https://issues.apache.org/jira/browse/FLINK-22886>, but this didn't
   help)
   - We're running on kubernetes in an application cluster.
   taskmanager.memory.process.size = 16GB, but we give our task manager pods a
   memory limit of 20GB. Our full config is below [0]

We've followed the steps at
https://erikwramner.files.wordpress.com/2017/10/native-memory-leaks-in-java.pdf
,
https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr007.html,
and
https://technology.blog.gov.uk/2015/12/11/using-jemalloc-to-get-to-the-bottom-of-a-memory-leak/
to try and diagnose but this didn't really give us something to go off of.

Notably, we baselined the jcmd memory profile (jcmd $(pgrep java)
VM.native_memory baseline) and then ran a diff before and after the
post-restart memory spike, and nothing in there reflects the few GB of
usage increase.

What was added to Flink 1.14? What other issues have you seen in the past?

Also I came across
https://medium.com/expedia-group-tech/solving-a-native-memory-leak-71fe4b6f9463
when researching rocksdb. It suggests that unclosed RocksDB iterators can
be a source of memory leaks. Is there any chance there are iterators being
left open post job restart?

[0]
```
jobmanager.memory.process.size: 16Gb

taskmanager.rpc.port: 6122
taskmanager.memory.process.size: 16Gb
taskmanager.memory.managed.fraction: 0.4
taskmanager.numberOfTaskSlots: 4

high-availability.storageDir: <redacted>
kubernetes.cluster-id: <redacted>
kubernetes.namespace: <redacted>
high-availability.jobmanager.port: 50010
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
restart-strategy: exponential-delay
resourcemanager.taskmanager-registration.timeout: 30 min

blob.server.port: 6124
queryable-state.proxy.ports: 6125

heartbeat.interval: 60000
heartbeat.timeout: 120000

web.timeout: 1800000
rest.flamegraph.enabled: true

state.backend: rocksdb
state.checkpoints.dir: <redacted>
state.savepoints.dir: <redacted>

state.backend.rocksdb.localdir: /rocksdb
state.backend.incremental: true
state.backend.fs.memory-threshold: 1m
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.checkpoint.transfer.thread.num: 4
state.backend.rocksdb.block.blocksize: 16KB
state.backend.rocksdb.block.cache-size: 64MB
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED

jobmanager.execution.failover-strategy: region

metrics.scope.jm: flink.jobmanager
metrics.scope.jm.job: flink.jobmanager.job
metrics.scope.tm: flink.taskmanager
metrics.scope.tm.job: flink.taskmanager.job
metrics.scope.task: flink.task
metrics.scope.operator: flink.operator

state.backend.rocksdb.metrics.actual-delayed-write-rate: true
state.backend.rocksdb.metrics.background-errors: true
state.backend.rocksdb.metrics.block-cache-capacity: true
state.backend.rocksdb.metrics.block-cache-pinned-usage: true
state.backend.rocksdb.metrics.block-cache-usage: true
state.backend.rocksdb.metrics.compaction-pending: true
state.backend.rocksdb.metrics.cur-size-active-mem-table: true
state.backend.rocksdb.metrics.cur-size-all-mem-tables: true
state.backend.rocksdb.metrics.estimate-live-data-size: true
state.backend.rocksdb.metrics.estimate-num-keys: true
state.backend.rocksdb.metrics.estimate-pending-compaction-bytes: true
state.backend.rocksdb.metrics.estimate-table-readers-mem: true
state.backend.rocksdb.metrics.is-write-stopped: true
state.backend.rocksdb.metrics.mem-table-flush-pending: true
state.backend.rocksdb.metrics.num-deletes-active-mem-table: true
state.backend.rocksdb.metrics.num-deletes-imm-mem-tables: true
state.backend.rocksdb.metrics.num-entries-active-mem-table: true
state.backend.rocksdb.metrics.num-entries-imm-mem-tables: true
state.backend.rocksdb.metrics.num-immutable-mem-table: true
state.backend.rocksdb.metrics.num-live-versions: true
state.backend.rocksdb.metrics.num-running-compactions: true
state.backend.rocksdb.metrics.num-running-flushes: true
state.backend.rocksdb.metrics.num-snapshots: true
state.backend.rocksdb.metrics.size-all-mem-tables: true

env.java.opts: -Djavax.net.ssl.keyStore=/app/kafka/certs/certificate.jks
-Djavax.net.ssl.keyStorePassword=changeit -Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.local.only=false
-Dcom.sun.management.jmxremote.port=1099
-Dcom.sun.management.jmxremote.rmi.port=1099
-Djava.rmi.server.hostname=127.0.0.1 -XX:NativeMemoryTracking=detail
env.java.opts.taskmanager: -Dtaskmanager.host=10.12.72.181
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/rocksdb/memdump.hprof
-Djava.rmi.server.hostname=127.0.0.1 -XX:NativeMemoryTracking=detail
jobmanager.rpc.address: flink-jobmanager
query.server.port: 6125
```

On Fri, Oct 1, 2021 at 9:38 AM Fabian Paul <fa...@ververica.com> wrote:

> Hi Kevin,
>
> You are right RocksDB is probably responsible for the memory consumption
> you are noticing. We have definitely seen similar issues in the past and
> with the latest Flink version 1.14 we tried to restrict the RocksDB memory
> consumption even more to make it better controllable.
>
> Can you tell is a bit more about the job you are using and the respective
> Flink version? I would be also interested what kind of memory
> configurations you did on the flink cluster i.e.
> taskmanager.memory.process.size. You can also have a look at the following
> docs pages [1] to
> fine tune the memory consumption of your job.
>
> Please let me know if that helps.
>
> Best,
> Fabian
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_setup/#configure-total-memory
>
>
>

Re: RocksDB: Spike in Memory Usage Post Restart

Posted by Fabian Paul <fa...@ververica.com>.
Hi Kevin,

You are right RocksDB is probably responsible for the memory consumption you are noticing. We have definitely seen similar issues in the past and 
with the latest Flink version 1.14 we tried to restrict the RocksDB memory consumption even more to make it better controllable.

Can you tell is a bit more about the job you are using and the respective Flink version? I would be also interested what kind of memory 
configurations you did on the flink cluster i.e. taskmanager.memory.process.size. You can also have a look at the following docs pages [1] to 
fine tune the memory consumption of your job.

Please let me know if that helps.

Best,
Fabian


[1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_setup/#configure-total-memory