You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steven Nelson <sn...@sourceallies.com> on 2020/05/27 15:30:19 UTC

Memory issue in Flink 1.10

We recently migrated to Flink 1.10, but are experiencing some issues with
memory.

Our cluster is:
1) Running inside of Kubernetes
2) Running in HA mode
3) Checkpointing/Savepointing to an HDFS cluster inside of Kubernetes
4) Using RocksDB for checkpointing
5) Running on m5d.4xlarge EC2 instances with 64gb of ram
6) The taskmanager pods do not have a memory limit set on them
7) We set taskmanager.memory.process.size to 48g

We get the following error:
2020-05-27 10:12:34
java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:191)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:255)
    at org.apache.flink.streaming.runtime.tasks.StreamTask
.initializeStateAndOpen(StreamTask.java:989)
    at org.apache.flink.streaming.runtime.tasks.StreamTask
.lambda$beforeInvoke$0(StreamTask.java:453)
    at org.apache.flink.streaming.runtime.tasks.
StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:448)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:460)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for WindowOperator_376501a366f04bbaab99945c23a40da5_(28/32)
from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:135)
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.keyedStatedBackend(
StreamTaskStateInitializerImpl.java:304)
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:131)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
unexpected exception.
    at org.apache.flink.contrib.streaming.state.
RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:
336)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend
.createKeyedStateBackend(RocksDBStateBackend.java:548)
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
StreamTaskStateInitializerImpl.java:288)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:694)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool
.java:72)
    at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver
.reallocPacketBuf(PacketReceiver.java:272)
    at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(
PacketReceiver.java:165)
    at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver
.receiveNextPacket(PacketReceiver.java:102)
    at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(
RemoteBlockReader2.java:201)
    at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2
.java:152)
    at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(
DFSInputStream.java:767)
    at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:
823)
    at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream
.java:883)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:926)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:727)
    at java.io.FilterInputStream.read(FilterInputStream.java:83)
    at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
HadoopDataInputStream.java:84)
    at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(
FSDataInputStreamWrapper.java:51)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.flink.core.io.VersionedIOReadableWritable.read(
VersionedIOReadableWritable.java:45)
    at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(
KeyedBackendSerializationProxy.java:133)
    at org.apache.flink.contrib.streaming.state.restore.
AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation
.java:187)
    at org.apache.flink.contrib.streaming.state.restore.
RocksDBFullRestoreOperation.restoreKVStateMetaData(
RocksDBFullRestoreOperation.java:180)
    at org.apache.flink.contrib.streaming.state.restore.
RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(
RocksDBFullRestoreOperation.java:167)
    at org.apache.flink.contrib.streaming.state.restore.
RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
    at org.apache.flink.contrib.streaming.state.
RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:
279)
    ... 15 more

Do I need to be more explicit with the off-heap memory for RocksDB?

-Steve

Re: Memory issue in Flink 1.10

Posted by Andrey Zagrebin <az...@apache.org>.
Hi Steve,

RocksDB does not contribute to the JVM direct memory. RocksDB off-heap
memory consumption is part of managed memory [1].

You got `OutOfMemoryError: Direct buffer memory` which is related to
the JVM direct memory, also off-heap but managed by JVM.
The JVM direct memory limit depends on the corresponding JVM argument of
Flink process: *-XX:MaxDirectMemorySize* [2].
You can increase the direct memory limit by changing this
configuration option: taskmanager.memory.task.off-heap.size [3].

Judging the stack trace you provided, the HDFS dependency of RocksDB state
backend requires more JVM direct memory for remote communication to restore
state.
This can also mean that there are other consumers of direct memory in your
application but it was the HDFS which hit the limit.
See also the troubleshooting guide for `OutOfMemoryError: Direct buffer
memory` [4].

Notice that Flink network buffers also use the JVM direct memory but Flink
makes sure that they do not exceed their limit [5].

Best,
Andrey

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_setup.html#managed-memory
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#jvm-parameters
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_setup.html#configure-off-heap-memory-direct-or-native
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_trouble.html#outofmemoryerror-direct-buffer-memory
[5]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview

On Wed, May 27, 2020 at 6:30 PM Steven Nelson <sn...@sourceallies.com>
wrote:

> We recently migrated to Flink 1.10, but are experiencing some issues with
> memory.
>
> Our cluster is:
> 1) Running inside of Kubernetes
> 2) Running in HA mode
> 3) Checkpointing/Savepointing to an HDFS cluster inside of Kubernetes
> 4) Using RocksDB for checkpointing
> 5) Running on m5d.4xlarge EC2 instances with 64gb of ram
> 6) The taskmanager pods do not have a memory limit set on them
> 7) We set taskmanager.memory.process.size to 48g
>
> We get the following error:
> 2020-05-27 10:12:34
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:191)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:255)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .initializeStateAndOpen(StreamTask.java:989)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$0(StreamTask.java:453)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:94)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:448)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:460)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for WindowOperator_376501a366f04bbaab99945c23a40da5_(28/32)
> from any of the 1 provided restore options.
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:135)
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.keyedStatedBackend(
> StreamTaskStateInitializerImpl.java:304)
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:131)
>     ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.
>     at org.apache.flink.contrib.streaming.state.
> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
> .java:336)
>     at org.apache.flink.contrib.streaming.state.RocksDBStateBackend
> .createKeyedStateBackend(RocksDBStateBackend.java:548)
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> StreamTaskStateInitializerImpl.java:288)
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:121)
>     ... 11 more
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>     at java.nio.Bits.reserveMemory(Bits.java:694)
>     at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>     at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>     at org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool
> .java:72)
>     at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver
> .reallocPacketBuf(PacketReceiver.java:272)
>     at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(
> PacketReceiver.java:165)
>     at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver
> .receiveNextPacket(PacketReceiver.java:102)
>     at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(
> RemoteBlockReader2.java:201)
>     at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2
> .java:152)
>     at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(
> DFSInputStream.java:767)
>     at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream
> .java:823)
>     at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(
> DFSInputStream.java:883)
>     at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:926)
>     at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:727)
>     at java.io.FilterInputStream.read(FilterInputStream.java:83)
>     at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
> HadoopDataInputStream.java:84)
>     at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(
> FSDataInputStreamWrapper.java:51)
>     at java.io.DataInputStream.readInt(DataInputStream.java:387)
>     at org.apache.flink.core.io.VersionedIOReadableWritable.read(
> VersionedIOReadableWritable.java:45)
>     at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(
> KeyedBackendSerializationProxy.java:133)
>     at org.apache.flink.contrib.streaming.state.restore.
> AbstractRocksDBRestoreOperation.readMetaData(
> AbstractRocksDBRestoreOperation.java:187)
>     at org.apache.flink.contrib.streaming.state.restore.
> RocksDBFullRestoreOperation.restoreKVStateMetaData(
> RocksDBFullRestoreOperation.java:180)
>     at org.apache.flink.contrib.streaming.state.restore.
> RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(
> RocksDBFullRestoreOperation.java:167)
>     at org.apache.flink.contrib.streaming.state.restore.
> RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
>     at org.apache.flink.contrib.streaming.state.
> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
> .java:279)
>     ... 15 more
>
> Do I need to be more explicit with the off-heap memory for RocksDB?
>
> -Steve
>
>