You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "wuzhiyu (Jira)" <ji...@apache.org> on 2021/12/21 08:39:00 UTC
[jira] [Created] (FLINK-25400) RocksDBStateBackend configurations does not work with SavepointEnvironment
wuzhiyu created FLINK-25400:
-------------------------------
Summary: RocksDBStateBackend configurations does not work with SavepointEnvironment
Key: FLINK-25400
URL: https://issues.apache.org/jira/browse/FLINK-25400
Project: Flink
Issue Type: Bug
Components: API / State Processor
Affects Versions: 1.12.2
Reporter: wuzhiyu
Hi~
I'm trying to use flink-state-processor-api to do state migrations by reading states from an existing savepoint, and writing them into a new savepoint after certain transformations.
However, the reading rate does not meet my expectation.
When I tried to tune RocksDB by enabling RocksDB native metrics, I found it did not work.
So I did some debug, I found when the job is running under a SavepointEnvironment, no RocksDBStatebackend configurations will be passed to RocksDBStateBackend.
The whole process is described as below (code demonstrated is under version release-1.12.2):
First, when org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend is invoked:
{code:java}
// org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend
private StateBackend createStateBackend() throws Exception {
final StateBackend fromApplication =
configuration.getStateBackend(getUserCodeClassLoader());
return StateBackendLoader.fromApplicationOrConfigOrDefault(
fromApplication,
getEnvironment().getTaskManagerInfo().getConfiguration(),
getUserCodeClassLoader(),
LOG); {code}
*getEnvironment()* returns a SavepointEnvironment instance.
And then *org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo* is invoked, it returns a new *org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo* instance.
{code:java}
// org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return new SavepointTaskManagerRuntimeInfo(getIOManager());
} {code}
At last, *org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration* is invoked. It returns an empty configuration, which means all configurations will be lost.
{code:java}
// org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration
@Override
public Configuration getConfiguration() {
return new Configuration();
} {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)