You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Xintong Song (Jira)" <ji...@apache.org> on 2020/12/17 09:40:00 UTC

[jira] [Comment Edited] (FLINK-20646) ReduceTransformation does not work with RocksDBStateBackend

    [ https://issues.apache.org/jira/browse/FLINK-20646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250920#comment-17250920 ] 

Xintong Song edited comment on FLINK-20646 at 12/17/20, 9:39 AM:
-----------------------------------------------------------------

I think this bug reveals two problems.
* Absence of common abstraction for "stateful transformations". Relying on various concrete transformation implementations to maintain the key selectors and declare the memory use case is fragile. New transformation implementations can easily overlook them, which is what's happening now.
* Lack of testings. I'm wondering how this problem escaped from our release testing. Reduce operation + RocksDB state backend should not be a that rare case. Obviously we don't have test coverage for such scenarios.

As for a quick fix, we can simply call {{updateManagedMemoryStateBackendUseCase}} in {{ReduceTransformation}}. I reviewed the type hierarchy of {{Transformation}} and do not see other sub-classes with this problem. 


was (Author: xintongsong):
I think this but reveals two problems.
* Absence of common abstraction for "stateful transformations". Relying on various concrete transformation implementations to maintain the key selectors and declare the memory use case is fragile. New transformation implementations can easily overlook them, which is what's happening now.
* Lack of testings. I'm wondering how this problem escaped from our release testing. Reduce operation + RocksDB state backend should not be a that rare case. Obviously we don't have test coverage for such scenarios.

As for a quick fix, we can simply call {{updateManagedMemoryStateBackendUseCase}} in {{ReduceTransformation}}. I reviewed the type hierarchy of {{Transformation}} and do not see other sub-classes with this problem. 

> ReduceTransformation does not work with RocksDBStateBackend
> -----------------------------------------------------------
>
>                 Key: FLINK-20646
>                 URL: https://issues.apache.org/jira/browse/FLINK-20646
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.12.0
>            Reporter: Xintong Song
>            Assignee: Xintong Song
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.12.1
>
>
> The intra-slot managed memory sharing (FLIP-141) requires transformations to properly declare their managed memory use cases.
> For RocksDB state backend, it requires all {{Transformation}}-s on a keyed stream (with non-nullĀ {{KeySelector}}) to callĀ {{Transformation#updateManagedMemoryStateBackendUseCase}}, which the newly introduced {{ReduceTransformation}} did not.
> As a result, Flink will not reserve managed memory for operators converted from {{ReduceTransformation}} (FLINK-19931), leading to the following failure when RocksDB state backend is used.
> {code}
> 16:58:49,373 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring keyed state backend for StreamGroupedReduceOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from alternative (1/1), will retry while more alternatives are available.
> java.io.IOException: Failed to acquire shared cache resource for RocksDB
> 	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:264) ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535) ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-runtime_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-runtime_2.11-1.12.0.jar:1.12.0]
> 	at java.lang.Thread.run(Thread.java:832) [?:?]
> Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`.
> 	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:164) ~[flink-core-1.12.0.jar:1.12.0]
> 	at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:631) ~[flink-runtime_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:612) ~[flink-runtime_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:499) ~[flink-runtime_2.11-1.12.0.jar:1.12.0]
> 	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:260) ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
> 	... 16 more
> {code}
> The problem is reported on the user-zh mailing list. (In Chinese though.)
> http://apache-flink.147419.n8.nabble.com/flink-1-12-RocksDBStateBackend-td9504.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)