You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shengjk1 <js...@163.com> on 2019/10/10 12:37:42 UTC

Re:Memory constrains running Flink on Kubernetes

+1


I also encountered a similar problem, but I run flink application that uses state in RocksDB on yarn. Yarn container was killed because OOM.
I also saw rockdb tuning guide[1], tune some parameters,but it is useless , such as:


class MyOptions1 implements OptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions.setDbWriteBufferSize(64 * 1024 * 1024)
.setIncreaseParallelism(2)
.setMaxBackgroundFlushes(2)
.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)
.setMaxOpenFiles(4)
.setUseFsync(false);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
return currentOptions.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(16 * 1024 * 1024)
//increases read amplification but decreases memory useage and space amplification
.setBlockSize(16 * 1024 * 1024))
.setWriteBufferSize(16 * 1024 * 1024)
.setMaxWriteBufferNumber(1);
}
}


Additional, this is FLINK-7289, it is similar to us. But I don’t find a good way to  fix it.




[1] https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
[2] https://issues.apache.org/jira/browse/FLINK-7289






Best,
Shengjk1




On 07/24/2019 03:48,wvl<le...@gmail.com> wrote:
Hi,


We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.


In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB

- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB

- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB

- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB



This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.


Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.

Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.



Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due

to no form of cleanup in the metaspace as classes get (re)loaded. 


These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB


Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.


Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?


Thanks,


William



Re: Re:Memory constrains running Flink on Kubernetes

Posted by Yun Tang <my...@live.com>.
Hi Shengjk1

setBlockCacheSize, setWriteBufferSize and setMaxWriteBufferNumber could help you to control memory usage. However, Flink would store state per column family which would increase the number of column family and each family has its own write buffer. FRocksDB [1] already plan to fix this by introducing RocksDB's feature of write buffer manager. We would try to fix FLINK-7289 before Flink-1.10 release.

If you are really urgent to fix this problem, I have a non-official built frocksDB based on rocksDB-5.18.3 which had been verified work well from Gyula Fora's experience. You could contact me in private to get this jar package and rebuild your Flink runtime to enable write buffer manager future.


[1] https://github.com/dataArtisans/frocksdb/pull/4
[2] https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager#limit-total-memory-of-memtables

Best
Yun Tang

________________________________
From: shengjk1 <js...@163.com>
Sent: Thursday, October 10, 2019 20:37
To: wvl <le...@gmail.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re:Memory constrains running Flink on Kubernetes

+1

I also encountered a similar problem, but I run flink application that uses state in RocksDB on yarn. Yarn container was killed because OOM.
I also saw rockdb tuning guide[1], tune some parameters,but it is useless , such as:

class MyOptions1 implements OptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions.setDbWriteBufferSize(64 * 1024 * 1024)
.setIncreaseParallelism(2)
.setMaxBackgroundFlushes(2)
.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)
.setMaxOpenFiles(4)
.setUseFsync(false);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
return currentOptions.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(16 * 1024 * 1024)
//increases read amplification but decreases memory useage and space amplification
.setBlockSize(16 * 1024 * 1024))
.setWriteBufferSize(16 * 1024 * 1024)
.setMaxWriteBufferNumber(1);
}
}

Additional, this is FLINK-7289, it is similar to us. But I don’t find a good way to  fix it.


[1] https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
[2] https://issues.apache.org/jira/browse/FLINK-7289



Best,
Shengjk1


On 07/24/2019 03:48,wvl<le...@gmail.com> wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were often running into memory issues made apparent by Kubernetes OOMKilled and Java OOM log events.

In order to tackle these, we're trying to account for all the memory used in the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where "You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm assuming corresponds to a "Column Family" in RockDB. Meaning our budget should be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also enabled various rocksdb metrics, but it's unclear where this Write Buffer memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded.

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
With flink config:
      taskmanager.heap.size: 5000m
      state.backend: rocksdb
      state.backend.incremental: true
      state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an error message which can easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, is there something we can do about this such as setting -XX:MaxMetaspaceSize?
- Any other tips to improve reliability running in resource constrained environments such as Kubernetes?

Thanks,

William