You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Schwalbe Matthias <Ma...@viseca.ch> on 2022/03/08 12:43:46 UTC

Savepoint API challenged with large savepoints

Dear Flink Team,

In the last weeks I was faced with a large savepoint (around 40GiB) that contained lots of obsolete data points and overwhelmed our infrastructure (i.e. failed to load/restart).
We could not afford to lose the state, hence I spent the time to transcode the savepoint into something smaller (ended up with 2.5 GiB).
During my efforts I encountered a couple of points that make savepoint API uneasy with larger savepoints, found simple solutions ...

I would like to contribute my findings and 'fixes', however on my corporate infrastructure I cannot fork/build Flink locally nor PR the changes later on.

Before creating Jira tickets I wanted to quickly discuss the matter.

Findings:


  *   (We are currently on Flink 1.13 (RocksDB state backend) but all findings apply as well to the latest version)
  *   WritableSavepoint.write(...) falls back to JobManagerCheckpointStorage which restricts savepoint size to 5MiB
     *   See relevant exception stack here [1]
     *   This is because SavepointTaskManagerRuntimeInfo.getConfiguration() always returns empty Configuration, hence
     *   Neither "state.checkpoint-storage" nor "state.checkpoints.dir" are/can be configured
     *   'fix': provide SavepointTaskManagerRuntimeInfo.getConfiguration() with a meaningful implementation and set configuration in SavepointEnvironment.getTaskManagerInfo()
  *   When loading a state, MultiStateKeyIterator load and bufferes the whole state in memory before it event processes a single data point
     *   This is absolutely no problem for small state (hence the unit tests work fine)
     *   MultiStateKeyIterator ctor sets up a java Stream that iterates all state descriptors and flattens all datapoints contained within
     *   The java.util.stream.Stream#flatMap function causes the buffering of the whole data set when enumerated later on
     *   See call stack [2]
        *   I our case this is 150e6 data points (> 1GiB just for the pointers to the data, let alone the data itself ~30GiB)
     *   I'm not aware of some instrumentation if Stream in order to avoid the problem, hence
     *   I coded an alternative implementation of MultiStateKeyIterator that avoids using java Stream,
     *   I can contribute our implementation (MultiStateKeyIteratorNoStreams)
  *   I found out that, at least when using LocalFileSystem on a windows system, read I/O to load a savepoint is unbuffered,
     *   See example stack [3]
     *   i.e. in order to load only a long in a serializer, it needs to go into kernel mode 8 times and load the 8 bytes one by one
     *   I coded a BufferedFSDataInputStreamWrapper that allows to opt-in buffered reads on any FileSystem implementation
     *   In our setting savepoint load is now 30 times faster
     *   I've once seen a Jira ticket as to improve savepoint load time in general (lost the link unfortunately), maybe this approach can help with it
     *   not sure if HDFS has got the same problem
     *   I can contribute my implementation

Looking forward to your comments


Matthias (Thias) Schwalbe


[1] exception stack:
8215140 [MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)#0] ERROR BatchTask - Error in task code:  MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)
java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=180075318 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
            at java.util.concurrent.FutureTask.report(FutureTask.java:122)
            at java.util.concurrent.FutureTask.get(FutureTask.java:192)
            at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
            at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
            at org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)
            at org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:90)
            at org.apache.flink.state.api.output.BoundedStreamTask.processInput(BoundedStreamTask.java:107)
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
            at org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:80)
            at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:107)
            at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
            at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
            at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=180075318 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
            at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:61)
            at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:141)
            at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:121)
            at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75)
            at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:87)
            at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
            at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
            at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
            at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
            at java.util.concurrent.FutureTask.run(FutureTask.java)
            at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)
            ... 14 more

[2] Streams call stack:
hasNext:77, RocksStateKeysIterator (org.apache.flink.contrib.streaming.state.iterator)
next:82, RocksStateKeysIterator (org.apache.flink.contrib.streaming.state.iterator)
forEachRemaining:116, Iterator (java.util)
forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
forEach:580, ReferencePipeline$Head (java.util.stream)
accept:270, ReferencePipeline$7$1 (java.util.stream)                                       # <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends Stream<? extends R>> var1)
accept:373, ReferencePipeline$11$1 (java.util.stream)                                      # Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)
accept:193, ReferencePipeline$3$1 (java.util.stream)                                       # <R> Stream<R> map(final Function<? super P_OUT, ? extends R> var1)
tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
lambda$initPartialTraversalState$0:294, StreamSpliterators$WrappingSpliterator (java.util.stream)
getAsBoolean:-1, 1528195520 (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream)
doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream)
tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
hasNext:681, Spliterators$1Adapter (java.util)
hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
hasNext:162, KeyedStateReaderOperator$NamespaceDecorator (org.apache.flink.state.api.input.operator)
reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
doRun:776, Task (org.apache.flink.runtime.taskmanager)
run:563, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

[3] unbuffered reads stack:
read:207, FileInputStream (java.io)
read:68, LocalDataInputStream (org.apache.flink.core.fs.local)
read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs)
read:42, ForwardingInputStream (org.apache.flink.runtime.util)
readInt:390, DataInputStream (java.io)
deserialize:80, BytePrimitiveArraySerializer (org.apache.flink.api.common.typeutils.base.array)
next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator (org.apache.flink.runtime.state.restore)
next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator (org.apache.flink.runtime.state.restore)
restoreKVStateData:147, RocksDBFullRestoreOperation (org.apache.flink.contrib.streaming.state.restore)



Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

RE: Savepoint API challenged with large savepoints

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Thanks, Chesnay,

I just created the 3 tickets (in my clumsy way):

  *   FLINK-26584<https://issues.apache.org/jira/browse/FLINK-26584> : State Processor API fails to write savepoints exceeding 5MB
  *   FLINK-26585<https://issues.apache.org/jira/browse/FLINK-26585> : State Processor API: Loading a state set buffers the whole state set in memory before starting to process
  *   FLINK-26586<https://issues.apache.org/jira/browse/FLINK-26586> : FileSystem uses unbuffered read I/O

I’ll be off the week starting Jan  21, but otherwise ready to discuss matters

Thias




From: Chesnay Schepler <ch...@apache.org>
Sent: Donnerstag, 10. März 2022 10:47
To: Schwalbe Matthias <Ma...@viseca.ch>; user@flink.apache.org
Subject: Re: Savepoint API challenged with large savepoints

That all sounds very interesting; I'd go ahead with creating tickets.

On 08/03/2022 13:43, Schwalbe Matthias wrote:
Dear Flink Team,

In the last weeks I was faced with a large savepoint (around 40GiB) that contained lots of obsolete data points and overwhelmed our infrastructure (i.e. failed to load/restart).
We could not afford to lose the state, hence I spent the time to transcode the savepoint into something smaller (ended up with 2.5 GiB).
During my efforts I encountered a couple of points that make savepoint API uneasy with larger savepoints, found simple solutions …

I would like to contribute my findings and ‘fixes’, however on my corporate infrastructure I cannot fork/build Flink locally nor PR the changes later on.

Before creating Jira tickets I wanted to quickly discuss the matter.

Findings:


  1.  (We are currently on Flink 1.13 (RocksDB state backend) but all findings apply as well to the latest version)
  2.  WritableSavepoint.write(…) falls back to JobManagerCheckpointStorage which restricts savepoint size to 5MiB

     *   See relevant exception stack here [1]
     *   This is because SavepointTaskManagerRuntimeInfo.getConfiguration() always returns empty Configuration, hence
     *   Neither “state.checkpoint-storage” nor “state.checkpoints.dir” are/can be configured
     *   ‘fix’: provide SavepointTaskManagerRuntimeInfo.getConfiguration() with a meaningful implementation and set configuration in SavepointEnvironment.getTaskManagerInfo()

  1.  When loading a state, MultiStateKeyIterator load and bufferes the whole state in memory before it event processes a single data point

     *   This is absolutely no problem for small state (hence the unit tests work fine)
     *   MultiStateKeyIterator ctor sets up a java Stream that iterates all state descriptors and flattens all datapoints contained within
     *   The java.util.stream.Stream#flatMap function causes the buffering of the whole data set when enumerated later on
     *   See call stack [2]

        *   I our case this is 150e6 data points (> 1GiB just for the pointers to the data, let alone the data itself ~30GiB)

     *   I’m not aware of some instrumentation if Stream in order to avoid the problem, hence
     *   I coded an alternative implementation of MultiStateKeyIterator that avoids using java Stream,
     *   I can contribute our implementation (MultiStateKeyIteratorNoStreams)

  1.  I found out that, at least when using LocalFileSystem on a windows system, read I/O to load a savepoint is unbuffered,

     *   See example stack [3]
     *   i.e. in order to load only a long in a serializer, it needs to go into kernel mode 8 times and load the 8 bytes one by one
     *   I coded a BufferedFSDataInputStreamWrapper that allows to opt-in buffered reads on any FileSystem implementation
     *   In our setting savepoint load is now 30 times faster
     *   I’ve once seen a Jira ticket as to improve savepoint load time in general (lost the link unfortunately), maybe this approach can help with it
     *   not sure if HDFS has got the same problem
     *   I can contribute my implementation

Looking forward to your comments


Matthias (Thias) Schwalbe


[1] exception stack:
8215140 [MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)#0] ERROR BatchTask - Error in task code:  MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)
java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=180075318 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
            at java.util.concurrent.FutureTask.report(FutureTask.java:122)
            at java.util.concurrent.FutureTask.get(FutureTask.java:192)
            at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
            at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
            at org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)
            at org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:90)
            at org.apache.flink.state.api.output.BoundedStreamTask.processInput(BoundedStreamTask.java:107)
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
            at org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:80)
            at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:107)
            at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
            at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
            at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=180075318 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
            at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:61)
            at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:141)
            at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:121)
            at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75)
            at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:87)
            at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
            at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
            at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
            at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
            at java.util.concurrent.FutureTask.run(FutureTask.java)
            at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)
            ... 14 more

[2] Streams call stack:
hasNext:77, RocksStateKeysIterator (org.apache.flink.contrib.streaming.state.iterator)
next:82, RocksStateKeysIterator (org.apache.flink.contrib.streaming.state.iterator)
forEachRemaining:116, Iterator (java.util)
forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
forEach:580, ReferencePipeline$Head (java.util.stream)
accept:270, ReferencePipeline$7$1 (java.util.stream)                                       # <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends Stream<? extends R>> var1)
accept:373, ReferencePipeline$11$1 (java.util.stream)                                      # Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)
accept:193, ReferencePipeline$3$1 (java.util.stream)                                       # <R> Stream<R> map(final Function<? super P_OUT, ? extends R> var1)
tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
lambda$initPartialTraversalState$0:294, StreamSpliterators$WrappingSpliterator (java.util.stream)
getAsBoolean:-1, 1528195520 (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream)
doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream)
tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
hasNext:681, Spliterators$1Adapter (java.util)
hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
hasNext:162, KeyedStateReaderOperator$NamespaceDecorator (org.apache.flink.state.api.input.operator)
reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
doRun:776, Task (org.apache.flink.runtime.taskmanager)
run:563, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

[3] unbuffered reads stack:
read:207, FileInputStream (java.io)
read:68, LocalDataInputStream (org.apache.flink.core.fs.local)
read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs)
read:42, ForwardingInputStream (org.apache.flink.runtime.util)
readInt:390, DataInputStream (java.io)
deserialize:80, BytePrimitiveArraySerializer (org.apache.flink.api.common.typeutils.base.array)
next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator (org.apache.flink.runtime.state.restore)
next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator (org.apache.flink.runtime.state.restore)
restoreKVStateData:147, RocksDBFullRestoreOperation (org.apache.flink.contrib.streaming.state.restore)



Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.


Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Savepoint API challenged with large savepoints

Posted by Chesnay Schepler <ch...@apache.org>.
That all sounds very interesting; I'd go ahead with creating tickets.

On 08/03/2022 13:43, Schwalbe Matthias wrote:
>
> Dear Flink Team,
>
> In the last weeks I was faced with a large savepoint (around 40GiB) 
> that contained lots of obsolete data points and overwhelmed our 
> infrastructure (i.e. failed to load/restart).
>
> We could not afford to lose the state, hence I spent the time to 
> transcode the savepoint into something smaller (ended up with 2.5 GiB).
>
> During my efforts I encountered a couple of points that make savepoint 
> API uneasy with larger savepoints, found simple solutions …
>
> I would like to contribute my findings and ‘fixes’, however on my 
> corporate infrastructure I cannot fork/build Flink locally nor PR the 
> changes later on.
>
> Before creating Jira tickets I wanted to quickly discuss the matter.
>
> Findings:
>
>   * /(We are currently on Flink 1.13 (RocksDB state backend) but all
>     findings apply as well to the latest version)/
>   * WritableSavepoint.write(…) falls back to
>     JobManagerCheckpointStorage which restricts savepoint size to 5MiB
>       o See relevant exception stack here [1]
>       o This is because
>         SavepointTaskManagerRuntimeInfo.getConfiguration() always
>         returns empty Configuration, hence
>       o Neither “state.checkpoint-storage” nor “state.checkpoints.dir”
>         are/can be configured
>       o ‘fix’: provide
>         SavepointTaskManagerRuntimeInfo.getConfiguration() with a
>         meaningful implementation and set configuration in
>         SavepointEnvironment.getTaskManagerInfo()
>   * When loading a state, MultiStateKeyIterator load and bufferes the
>     whole state in memory before it event processes a single data point
>       o This is absolutely no problem for small state (hence the unit
>         tests work fine)
>       o MultiStateKeyIterator ctor sets up a java Stream that iterates
>         all state descriptors and flattens all datapoints contained within
>       o The java.util.stream.Stream#flatMap function causes the
>         buffering of the whole data set when enumerated later on
>       o See call stack [2]
>           + I our case this is 150e6 data points (> 1GiB just for the
>             pointers to the data, let alone the data itself ~30GiB)
>       o I’m not aware of some instrumentation if Stream in order to
>         avoid the problem, hence
>       o I coded an alternative implementation of MultiStateKeyIterator
>         that avoids using java Stream,
>       o I can contribute our implementation
>         (MultiStateKeyIteratorNoStreams)
>   * I found out that, at least when using LocalFileSystem on a windows
>     system, read I/O to load a savepoint is unbuffered,
>       o See example stack [3]
>       o i.e. in order to load only a long in a serializer, it needs to
>         go into kernel mode 8 times and load the 8 bytes one by one
>       o I coded a BufferedFSDataInputStreamWrapper that allows to
>         opt-in buffered reads on any FileSystem implementation
>       o In our setting savepoint load is now 30 times faster
>       o I’ve once seen a Jira ticket as to improve savepoint load time
>         in general (lost the link unfortunately), maybe this approach
>         can help with it
>       o not sure if HDFS has got the same problem
>       o I can contribute my implementation
>
> Looking forward to your comments
>
> Matthias (Thias) Schwalbe
>
> [1] exception stack:
>
> 8215140 [MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)#0] 
> ERROR BatchTask - Error in task code:  MapPartition 
> (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)
>
> java.util.concurrent.ExecutionException: java.io.IOException: Size of 
> the state is larger than the maximum permitted memory-backed state. 
> Size=180075318 , maxSize=5242880 . Consider using a different state 
> backend, like the File System State backend.
>
>             at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>
>             at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>
>             at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
>
>             at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
>
>             at 
> org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)
>
>             at 
> org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:90)
>
>             at 
> org.apache.flink.state.api.output.BoundedStreamTask.processInput(BoundedStreamTask.java:107)
>
>             at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>
>             at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
>
>             at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
>
>             at 
> org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:80)
>
>             at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:107)
>
>             at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
>
>             at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
>
>             at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
>
>             at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>
>             at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.io.IOException: Size of the state is larger than the 
> maximum permitted memory-backed state. Size=180075318 , 
> maxSize=5242880 . Consider using a different state backend, like the 
> File System State backend.
>
>             at 
> org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:61)
>
>             at 
> org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:141)
>
>             at 
> org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:121)
>
>             at 
> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75)
>
>             at 
> org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:87)
>
>             at 
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
>
>             at 
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
>
>             at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
>
>             at 
> java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>
>             at java.util.concurrent.FutureTask.run(FutureTask.java)
>
>             at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)
>
>             ... 14 more
>
> [2] Streams call stack:
>
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
>
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
>
> forEachRemaining:116, Iterator (java.util)
>
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
>
> forEach:580, ReferencePipeline$Head (java.util.stream)
>
> accept:270, ReferencePipeline$7$1 
> (java.util.stream)                                       # <R> 
> Stream<R> flatMap(final Function<? super P_OUT, ? extends Stream<? 
> extends R>> var1)
>
> accept:373, ReferencePipeline$11$1 
> (java.util.stream)                                      # 
> Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)
>
> accept:193, ReferencePipeline$3$1 (java.util.stream)                
>                        # <R> Stream<R> map(final Function<? super 
> P_OUT, ? extends R> var1)
>
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
>
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
>
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
>
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
>
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
>
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
>
> hasNext:681, Spliterators$1Adapter (java.util)
>
> hasNext:83, MultiStateKeyIterator(org.apache.flink.state.api.input)
>
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
>
> reachedEnd:215, KeyedStateInputFormat(org.apache.flink.state.api.input)
>
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
>
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
>
> run:563, Task (org.apache.flink.runtime.taskmanager)
>
> run:748, Thread (java.lang)
>
> [3] unbuffered reads stack:
>
> read:207, FileInputStream (java.io)
>
> read:68, LocalDataInputStream (org.apache.flink.core.fs.local)
>
> read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs)
>
> read:42, ForwardingInputStream (org.apache.flink.runtime.util)
>
> readInt:390, DataInputStream (java.io)
>
> deserialize:80, BytePrimitiveArraySerializer 
> (org.apache.flink.api.common.typeutils.base.array)
>
> next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
>
> next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
>
> restoreKVStateData:147, RocksDBFullRestoreOperation 
> (org.apache.flink.contrib.streaming.state.restore)
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und 
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die 
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden 
> kann, übernehmen wir keine Haftung für die Gewährung der 
> Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher 
> Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um 
> Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
> unberechtigte Verwendung oder Verbreitung dieser Informationen ist 
> streng verboten.
>
> This message is intended only for the named recipient and may contain 
> confidential or privileged information. As the confidentiality of 
> email communication cannot be guaranteed, we do not accept any 
> responsibility for the confidentiality and the intactness of this 
> message. If you have received it in error, please advise the sender by 
> return e-mail and delete this message and any attachments. Any 
> unauthorised use or dissemination of this information is strictly 
> prohibited.