You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "wuzhiyu (Jira)" <ji...@apache.org> on 2021/12/21 08:39:00 UTC

[jira] [Created] (FLINK-25400) RocksDBStateBackend configurations does not work with SavepointEnvironment

wuzhiyu created FLINK-25400:
-------------------------------

             Summary: RocksDBStateBackend configurations does not work with SavepointEnvironment
                 Key: FLINK-25400
                 URL: https://issues.apache.org/jira/browse/FLINK-25400
             Project: Flink
          Issue Type: Bug
          Components: API / State Processor
    Affects Versions: 1.12.2
            Reporter: wuzhiyu


Hi~

I'm trying to use flink-state-processor-api to do state migrations by reading states from an existing savepoint, and writing them into a new savepoint after certain transformations.

However, the reading rate does not  meet my expectation.

When I tried to tune RocksDB by enabling RocksDB native metrics, I found it did not work.

So I did some debug, I found when the job is running under a SavepointEnvironment, no RocksDBStatebackend configurations will be passed to RocksDBStateBackend.

The whole process is described as below (code demonstrated is under version release-1.12.2):

First, when org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend is invoked:

 
{code:java}
// org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend
private StateBackend createStateBackend() throws Exception {
    final StateBackend fromApplication =
            configuration.getStateBackend(getUserCodeClassLoader());

    return StateBackendLoader.fromApplicationOrConfigOrDefault(
            fromApplication,
            getEnvironment().getTaskManagerInfo().getConfiguration(),
            getUserCodeClassLoader(),
            LOG); {code}
*getEnvironment()* returns a SavepointEnvironment instance.

 

And then *org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo* is invoked, it returns a new *org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo* instance.

 
{code:java}
// org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
    return new SavepointTaskManagerRuntimeInfo(getIOManager());
} {code}
 

At last, *org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration* is invoked. It returns an empty configuration, which means all configurations will be lost.
{code:java}
// org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration
@Override
public Configuration getConfiguration() {
    return new Configuration();
} {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)