You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Sihua Zhou (JIRA)" <ji...@apache.org> on 2018/05/19 00:18:00 UTC
[jira] [Closed] (FLINK-9401) Data lost when rescaling the job from
incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-9401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sihua Zhou closed FLINK-9401.
-----------------------------
Resolution: Invalid
> Data lost when rescaling the job from incremental checkpoint
> ------------------------------------------------------------
>
> Key: FLINK-9401
> URL: https://issues.apache.org/jira/browse/FLINK-9401
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.5.0, 1.4.2
> Reporter: Sihua Zhou
> Assignee: Sihua Zhou
> Priority: Blocker
>
> We may lost data when rescaling job from incremental checkpoint because of the following code.
> {code:java}
> try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) {
> int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
> byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
> for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
> startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
> }
> iterator.seek(startKeyGroupPrefixBytes);
> while (iterator.isValid()) {
> int keyGroup = 0;
> for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
> keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
> }
> if (stateBackend.keyGroupRange.contains(keyGroup)) {
> stateBackend.db.put(targetColumnFamilyHandle,
> iterator.key(), iterator.value());
> }
> iterator.next();
> }
> }
> {code}
> For every state handle to fetch the target data, we _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be INVALID immediately if the state handle's _start key group_ is bigger that _state.keyGroupRange.getStartKeyGroup()_. Then, data lost...
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)