You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2020/05/25 09:34:00 UTC
[jira] [Created] (FLINK-17923) It will throw
MemoryAllocationException if rocksdb statebackend and Python UDF are used
in the same slot
Dian Fu created FLINK-17923:
-------------------------------
Summary: It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot
Key: FLINK-17923
URL: https://issues.apache.org/jira/browse/FLINK-17923
Project: Flink
Issue Type: Bug
Components: API / Python, Runtime / State Backends
Affects Versions: 1.10.0, 1.11.0
Reporter: Dian Fu
Fix For: 1.11.0
For the following job:
{code}
import logging
import os
import shutil
import sys
import tempfile
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
def word_count():
content = "line Licensed to the Apache Software Foundation ASF under one " \
"line or more contributor license agreements See the NOTICE file " \
"line distributed with this work for additional information " \
"line regarding copyright ownership The ASF licenses this file " \
"to you under the Apache License Version the " \
"License you may not use this file except in compliance " \
"with the License"
t_config = TableConfig()
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env, t_config)
# register Results table in table environment
tmp_dir = tempfile.gettempdir()
result_path = tmp_dir + '/result'
if os.path.exists(result_path):
try:
if os.path.isfile(result_path):
os.remove(result_path)
else:
shutil.rmtree(result_path)
except OSError as e:
logging.error("Error removing directory: %s - %s.", e.filename, e.strerror)
logging.info("Results directory: %s", result_path)
sink_ddl = """
create table Results(
word VARCHAR,
`count` BIGINT
) with (
'connector' = 'blackhole'
)
"""
t_env.sql_update(sink_ddl)
@udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def inc(count):
return count + 1
t_env.register_function("inc", inc)
elements = [(word, 1) for word in content.split(" ")]
t_env.from_elements(elements, ["word", "count"]) \
.group_by("word") \
.select("word, count(1) as count") \
.select("word, inc(count) as count") \
.insert_into("Results")
t_env.execute("word_count")
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
word_count()
{code}
It will throw the following exception:
{code}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
... 9 more
Caused by: java.io.IOException: Failed to acquire shared cache resource for RocksDB
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not created the shared memory resource of size 536870920. Not enough memory left to reserve from the slot's managed memory.
at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603)
at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617)
at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566)
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208)
... 15 more
Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could not allocate 536870920 bytes. Only 454033416 bytes are remaining.
at org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461)
at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601)
... 20 more
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)