You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Xintong Song (Jira)" <ji...@apache.org> on 2020/05/26 07:09:00 UTC

[jira] [Commented] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot

    [ https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17116500#comment-17116500 ] 

Xintong Song commented on FLINK-17923:
--------------------------------------

The problem is that RocksDB assumes it is the only managed memory consumer in streaming scenarios, and tries to take all the managed memory in the slot, while actually Python UDF might also reserve managed memory in streaming scenarios.

h3. More backgrounds
With FLIP-53, when generating the stream graph, we make a plan on how managed memory should be shared by operators within a slot.
* For batch jobs, we calculate a fraction for each operator, representing what fraction of the slot's managed memory the operator should use. Operators will read this fraction in runtime, and reserve the corresponding memory from the memory manager.
* For streaming jobs, we assumed the only managed memory consumer is RocksDBStateBackend. Therefore, calculation of the fraction (when generating stream graph) is omitted. RocksDBStateBackend will always reserve all (fraction = 1) the managed memory from the memory manager. 

h3. Potential solutions
There was an offline discussion between [~dian.fu], [~zhuzh], [~yunta] and me. And here are some ideas we came up with.
# We can say that ATM we do not support Python UDF and RocksDBStateBackend work together. We can add a check at compiling time and throw an exception / warning if they are used together. Given that release 1.11 is already frozen, this could avoid rushing significant changes in the last minute. The drawback is obviously we loose a large portion of steaming Python UDF use cases for release 1.11.
# We can make Python UDF not reserve managed memory. Basically, Python UDF uses memory in the same way how other user codes use off-heap memory. Users need to explicitly configure larger task off-heap memory. The drawbacks for this solutions are 1) it requires more user involvement, 2) it breaks the current 1.10 behavior in batch scenarios where such user involvement was not needed, and 3) we might need to revert the changes in future.
# We can also calculate a fraction for RocksDBStateBackend, making it properly share managed memory with Python UDFs. This is probably the most proper solution. The problem is there are still some open questions, such as how to calculate the fraction (because RocksDBStateBackend allocates managed memory in a per-slot way rather than per-operator), and how to pass the fraction to the state backend. We are not sure whether this is doable in the 1.11 release cycle, given that it's already frozen.

We would like to hear more opinions from the community. Many Thanks.

> It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot  
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17923
>                 URL: https://issues.apache.org/jira/browse/FLINK-17923
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python, Runtime / State Backends
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: Dian Fu
>            Priority: Blocker
>             Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
>     content = "line Licensed to the Apache Software Foundation ASF under one " \
>               "line or more contributor license agreements See the NOTICE file " \
>               "line distributed with this work for additional information " \
>               "line regarding copyright ownership The ASF licenses this file " \
>               "to you under the Apache License Version the " \
>               "License you may not use this file except in compliance " \
>               "with the License"
>     t_config = TableConfig()
>     env = StreamExecutionEnvironment.get_execution_environment()
>     t_env = StreamTableEnvironment.create(env, t_config)
>     # register Results table in table environment
>     tmp_dir = tempfile.gettempdir()
>     result_path = tmp_dir + '/result'
>     if os.path.exists(result_path):
>         try:
>             if os.path.isfile(result_path):
>                 os.remove(result_path)
>             else:
>                 shutil.rmtree(result_path)
>         except OSError as e:
>             logging.error("Error removing directory: %s - %s.", e.filename, e.strerror)
>     logging.info("Results directory: %s", result_path)
>     sink_ddl = """
>         create table Results(
>             word VARCHAR,
>             `count` BIGINT
>         ) with (
>             'connector' = 'blackhole'
>         )
>         """
>     t_env.sql_update(sink_ddl)
>     @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
>     def inc(count):
>         return count + 1
>     t_env.register_function("inc", inc)
>     elements = [(word, 1) for word in content.split(" ")]
>     t_env.from_elements(elements, ["word", "count"]) \
>          .group_by("word") \
>          .select("word, count(1) as count") \
>          .select("word, inc(count) as count") \
>          .insert_into("Results")
>     t_env.execute("word_count")
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
>     word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from any of the 1 provided restore options.
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
> 	... 9 more
> Caused by: java.io.IOException: Failed to acquire shared cache resource for RocksDB
> 	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
> 	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> 	... 11 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not created the shared memory resource of size 536870920. Not enough memory left to reserve from the slot's managed memory.
> 	at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603)
> 	at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
> 	at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
> 	at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617)
> 	at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566)
> 	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208)
> 	... 15 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could not allocate 536870920 bytes. Only 454033416 bytes are remaining.
> 	at org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461)
> 	at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601)
> 	... 20 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)