You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/14 11:50:08 UTC
[1/5] flink git commit: [FLINK-6534] [checkpoint] Use async IO to
dispose state in SharedStateRegistry
Repository: flink
Updated Branches:
refs/heads/master b54f44888 -> 44fb035e0
[FLINK-6534] [checkpoint] Use async IO to dispose state in SharedStateRegistry
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44fb035e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44fb035e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44fb035e
Branch: refs/heads/master
Commit: 44fb035e021259986f0b1aac4126143510a758e5
Parents: 098e46f
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri May 12 16:01:05 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 13:49:50 2017 +0200
----------------------------------------------------------------------
.../runtime/checkpoint/AbstractCompletedCheckpointStore.java | 6 ++++++
.../runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java | 2 ++
.../org/apache/flink/runtime/state/SharedStateRegistry.java | 6 +++++-
3 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/44fb035e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
index f42fd06..bf70501 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import java.util.concurrent.Executor;
+
/**
* This is the base class that provides implementation of some aspects common for all
* {@link CompletedCheckpointStore}s.
@@ -34,4 +36,8 @@ public abstract class AbstractCompletedCheckpointStore implements CompletedCheck
public AbstractCompletedCheckpointStore() {
this.sharedStateRegistry = new SharedStateRegistry();
}
+
+ public AbstractCompletedCheckpointStore(Executor asyncIOExecutor) {
+ this.sharedStateRegistry = new SharedStateRegistry(asyncIOExecutor);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44fb035e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 52a4eea..c8c68bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -106,6 +106,8 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage,
Executor executor) throws Exception {
+ super(executor);
+
checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
checkNotNull(stateStorage, "State storage");
http://git-wip-us.apache.org/repos/asf/flink/blob/44fb035e/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 9cfdec7..f9161b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -45,8 +45,12 @@ public class SharedStateRegistry {
private final Executor asyncDisposalExecutor;
public SharedStateRegistry() {
+ this(Executors.directExecutor());
+ }
+
+ public SharedStateRegistry(Executor asyncDisposalExecutor) {
this.registeredStates = new HashMap<>();
- this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534
+ this.asyncDisposalExecutor = Preconditions.checkNotNull(asyncDisposalExecutor);
}
/**
[4/5] flink git commit: [FLINK-6545] [checkpoint] Make incremental
checkpoints externalizable
Posted by sr...@apache.org.
[FLINK-6545] [checkpoint] Make incremental checkpoints externalizable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/098e46f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/098e46f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/098e46f2
Branch: refs/heads/master
Commit: 098e46f2d222e8e6c5c27bc7ded40ee642dad104
Parents: efbb41b
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu May 11 21:04:29 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 13:49:50 2017 +0200
----------------------------------------------------------------------
.../RocksDBIncrementalKeyedStateHandle.java | 241 --------------
.../state/RocksDBKeyedStateBackend.java | 155 ++++-----
.../org/apache/flink/util/StringBasedID.java | 69 ++++
.../savepoint/SavepointV2Serializer.java | 83 ++++-
.../state/IncrementalKeyedStateHandle.java | 324 +++++++++++++++++++
.../state/PlaceholderStreamStateHandle.java | 88 +++++
.../flink/runtime/state/SharedStateHandle.java | 39 ---
.../runtime/state/SharedStateRegistry.java | 2 +-
.../runtime/state/SharedStateRegistryKey.java | 42 +--
.../apache/flink/runtime/state/StateHandle.java | 37 ---
.../flink/runtime/state/StateHandleID.java | 37 +++
.../savepoint/CheckpointTestUtils.java | 87 +++--
.../savepoint/SavepointV2SerializerTest.java | 1 -
.../flink/runtime/state/StateUtilTest.java | 36 ---
14 files changed, 736 insertions(+), 505 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
deleted file mode 100644
index 961182d..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.runtime.state.CompositeStateHandle;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.SharedStateRegistryKey;
-import org.apache.flink.runtime.state.StateUtil;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend}.
- *
- * The states contained in an incremental snapshot include
- * <ul>
- * <li> New SST state which includes the sst files produced since the last completed
- * checkpoint. These files can be referenced by succeeding checkpoints if the
- * checkpoint succeeds to complete. </li>
- * <li> Old SST state which includes the sst files materialized in previous
- * checkpoints. </li>
- * <li> MISC state which include the other files in the RocksDB instance, e.g. the
- * LOG and MANIFEST files. These files are mutable, hence cannot be shared by
- * other checkpoints. </li>
- * <li> Meta state which includes the information of existing states. </li>
- * </ul>
- */
-public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle {
-
- private static final Logger LOG = LoggerFactory.getLogger(RocksDBIncrementalKeyedStateHandle.class);
-
- private static final long serialVersionUID = -8328808513197388231L;
-
- private final String operatorIdentifier;
-
- private final KeyGroupRange keyGroupRange;
-
- private final long checkpointId;
-
- private final Map<String, StreamStateHandle> unregisteredSstFiles;
-
- private final Map<String, StreamStateHandle> registeredSstFiles;
-
- private final Map<String, StreamStateHandle> miscFiles;
-
- private final StreamStateHandle metaStateHandle;
-
- /**
- * True if the state handle has already registered shared states.
- *
- * Once the shared states are registered, it's the {@link SharedStateRegistry}'s
- * responsibility to maintain the shared states. But in the cases where the
- * state handle is discarded before performing the registration, the handle
- * should delete all the shared states created by it.
- */
- private boolean registered;
-
- RocksDBIncrementalKeyedStateHandle(
- String operatorIdentifier,
- KeyGroupRange keyGroupRange,
- long checkpointId,
- Map<String, StreamStateHandle> unregisteredSstFiles,
- Map<String, StreamStateHandle> registeredSstFiles,
- Map<String, StreamStateHandle> miscFiles,
- StreamStateHandle metaStateHandle) {
-
- this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
- this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
- this.checkpointId = checkpointId;
- this.unregisteredSstFiles = Preconditions.checkNotNull(unregisteredSstFiles);
- this.registeredSstFiles = Preconditions.checkNotNull(registeredSstFiles);
- this.miscFiles = Preconditions.checkNotNull(miscFiles);
- this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle);
- this.registered = false;
- }
-
- @Override
- public KeyGroupRange getKeyGroupRange() {
- return keyGroupRange;
- }
-
- long getCheckpointId() {
- return checkpointId;
- }
-
- Map<String, StreamStateHandle> getUnregisteredSstFiles() {
- return unregisteredSstFiles;
- }
-
- Map<String, StreamStateHandle> getRegisteredSstFiles() {
- return registeredSstFiles;
- }
-
- Map<String, StreamStateHandle> getMiscFiles() {
- return miscFiles;
- }
-
- StreamStateHandle getMetaStateHandle() {
- return metaStateHandle;
- }
-
- @Override
- public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
- if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
- return this;
- } else {
- return null;
- }
- }
-
- @Override
- public void discardState() throws Exception {
-
- Preconditions.checkState(!registered, "Attempt to dispose a registered composite state with registered shared state. Must unregister first.");
-
- try {
- metaStateHandle.discardState();
- } catch (Exception e) {
- LOG.warn("Could not properly discard meta data.", e);
- }
-
- try {
- StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values());
- } catch (Exception e) {
- LOG.warn("Could not properly discard misc file states.", e);
- }
-
- try {
- StateUtil.bestEffortDiscardAllStateObjects(unregisteredSstFiles.values());
- } catch (Exception e) {
- LOG.warn("Could not properly discard new sst file states.", e);
- }
-
- }
-
- @Override
- public long getStateSize() {
- long size = StateUtil.getStateSize(metaStateHandle);
-
- for (StreamStateHandle newSstFileHandle : unregisteredSstFiles.values()) {
- size += newSstFileHandle.getStateSize();
- }
-
- for (StreamStateHandle oldSstFileHandle : registeredSstFiles.values()) {
- size += oldSstFileHandle.getStateSize();
- }
-
- for (StreamStateHandle miscFileHandle : miscFiles.values()) {
- size += miscFileHandle.getStateSize();
- }
-
- return size;
- }
-
- @Override
- public void registerSharedStates(SharedStateRegistry stateRegistry) {
-
- Preconditions.checkState(!registered, "The state handle has already registered its shared states.");
-
- for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) {
- SharedStateRegistryKey registryKey =
- createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
-
- SharedStateRegistry.Result result =
- stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue());
-
- // We update our reference with the result from the registry, to prevent the following
- // problem:
- // A previous checkpoint n has already registered the state. This can happen if a
- // following checkpoint (n + x) wants to reference the same state before the backend got
- // notified that checkpoint n completed. In this case, the shared registry did
- // deduplication and returns the previous reference.
- newSstFileEntry.setValue(result.getReference());
- }
-
- for (Map.Entry<String, StreamStateHandle> oldSstFileName : registeredSstFiles.entrySet()) {
- SharedStateRegistryKey registryKey =
- createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
-
- SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey);
-
- // Again we update our state handle with the result from the registry, thus replacing
- // placeholder state handles with the originals.
- oldSstFileName.setValue(result.getReference());
- }
-
- // Migrate state from unregistered to registered, so that it will not count as private state
- // for #discardState() from now.
- registeredSstFiles.putAll(unregisteredSstFiles);
- unregisteredSstFiles.clear();
-
- registered = true;
- }
-
- @Override
- public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
-
- Preconditions.checkState(registered, "The state handle has not registered its shared states yet.");
-
- for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) {
- SharedStateRegistryKey registryKey =
- createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
- stateRegistry.releaseReference(registryKey);
- }
-
- for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : registeredSstFiles.entrySet()) {
- SharedStateRegistryKey registryKey =
- createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey());
- stateRegistry.releaseReference(registryKey);
- }
-
- registered = false;
- }
-
- private SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(String fileName) {
- return new SharedStateRegistryKey(operatorIdentifier + "-" + keyGroupRange + "-" + fileName);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index b9468f7..4bd94fd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -55,13 +55,16 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateMigrationUtil;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
@@ -72,7 +75,6 @@ import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
@@ -172,7 +174,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final boolean enableIncrementalCheckpointing;
/** The sst files materialized in pending checkpoints */
- private final SortedMap<Long, Map<String, StreamStateHandle>> materializedSstFiles = new TreeMap<>();
+ private final SortedMap<Long, Map<StateHandleID, StreamStateHandle>> materializedSstFiles = new TreeMap<>();
/** The identifier of the last completed checkpoint */
private long lastCompletedCheckpointId = -1;
@@ -723,7 +725,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final long checkpointTimestamp;
/** All sst files that were part of the last previously completed checkpoint */
- private Map<String, StreamStateHandle> baseSstFiles;
+ private Map<StateHandleID, StreamStateHandle> baseSstFiles;
/** The state meta data */
private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>();
@@ -735,13 +737,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final CloseableRegistry closeableRegistry = new CloseableRegistry();
// new sst files since the last completed checkpoint
- private final Map<String, StreamStateHandle> newSstFiles = new HashMap<>();
+ private final Map<StateHandleID, StreamStateHandle> newSstFiles = new HashMap<>();
// old sst files which have been materialized in previous completed checkpoints
- private final Map<String, StreamStateHandle> oldSstFiles = new HashMap<>();
+ private final Map<StateHandleID, StreamStateHandle> oldSstFiles = new HashMap<>();
// handles to the misc files in the current snapshot
- private final Map<String, StreamStateHandle> miscFiles = new HashMap<>();
+ private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
private StreamStateHandle metaStateHandle = null;
@@ -865,8 +867,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath);
if (fileStatuses != null) {
for (FileStatus fileStatus : fileStatuses) {
- Path filePath = fileStatus.getPath();
- String fileName = filePath.getName();
+ final Path filePath = fileStatus.getPath();
+ final String fileName = filePath.getName();
+ final StateHandleID stateHandleID = new StateHandleID(fileName);
if (fileName.endsWith(SST_FILE_SUFFIX)) {
StreamStateHandle fileHandle =
@@ -874,20 +877,24 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
if (fileHandle == null) {
fileHandle = materializeStateData(filePath);
- newSstFiles.put(fileName, fileHandle);
+ newSstFiles.put(stateHandleID, fileHandle);
} else {
// we introduce a placeholder state handle, that is replaced with the
// original from the shared state registry (created from a previous checkpoint)
- oldSstFiles.put(fileName, new PlaceholderStreamStateHandle(fileHandle.getStateSize()));
+ oldSstFiles.put(
+ stateHandleID,
+ new PlaceholderStreamStateHandle(fileHandle.getStateSize()));
}
} else {
StreamStateHandle fileHandle = materializeStateData(filePath);
- miscFiles.put(fileName, fileHandle);
+ miscFiles.put(stateHandleID, fileHandle);
}
}
}
- Map<String, StreamStateHandle> sstFiles = new HashMap<>(newSstFiles.size() + oldSstFiles.size());
+ Map<StateHandleID, StreamStateHandle> sstFiles =
+ new HashMap<>(newSstFiles.size() + oldSstFiles.size());
+
sstFiles.putAll(newSstFiles);
sstFiles.putAll(oldSstFiles);
@@ -895,7 +902,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
}
- return new RocksDBIncrementalKeyedStateHandle(
+ return new IncrementalKeyedStateHandle(
stateBackend.operatorIdentifier,
stateBackend.keyGroupRange,
checkpointId,
@@ -940,39 +947,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
}
-
- /**
- * A placeholder state handle for shared state that will replaced by an original that was
- * created in a previous checkpoint. So we don't have to send the handle twice, e.g. in
- * case of {@link ByteStreamStateHandle}.
- */
- private static final class PlaceholderStreamStateHandle implements StreamStateHandle {
-
- private static final long serialVersionUID = 1L;
-
- /** We remember the size of the original file for which this is a placeholder */
- private final long originalSize;
-
- public PlaceholderStreamStateHandle(long originalSize) {
- this.originalSize = originalSize;
- }
-
- @Override
- public FSDataInputStream openInputStream() {
- throw new UnsupportedOperationException(
- "This is only a placeholder to be replaced by a real StreamStateHandle in the checkpoint coordinator.");
- }
-
- @Override
- public void discardState() throws Exception {
- // nothing to do.
- }
-
- @Override
- public long getStateSize() {
- return originalSize;
- }
- }
}
@Override
@@ -989,7 +963,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
} else if (MigrationUtil.isOldSavepointKeyedState(restoreState)) {
LOG.info("Converting RocksDB state from old savepoint.");
restoreOldSavepointKeyedState(restoreState);
- } else if (restoreState.iterator().next() instanceof RocksDBIncrementalKeyedStateHandle) {
+ } else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) {
RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation(this);
restoreOperation.restore(restoreState);
} else {
@@ -1302,7 +1276,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
private void restoreInstance(
- RocksDBIncrementalKeyedStateHandle restoreStateHandle,
+ IncrementalKeyedStateHandle restoreStateHandle,
boolean hasExtraKeys) throws Exception {
// read state data
@@ -1311,29 +1285,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
UUID.randomUUID().toString());
try {
- Map<String, StreamStateHandle> newSstFiles = restoreStateHandle.getUnregisteredSstFiles();
- for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) {
- String fileName = newSstFileEntry.getKey();
- StreamStateHandle remoteFileHandle = newSstFileEntry.getValue();
+ final Map<StateHandleID, StreamStateHandle> newSstFiles =
+ restoreStateHandle.getCreatedSharedState();
+ final Map<StateHandleID, StreamStateHandle> oldSstFiles =
+ restoreStateHandle.getReferencedSharedState();
+ final Map<StateHandleID, StreamStateHandle> miscFiles =
+ restoreStateHandle.getPrivateState();
- readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
- }
-
- Map<String, StreamStateHandle> oldSstFiles = restoreStateHandle.getRegisteredSstFiles();
- for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) {
- String fileName = oldSstFileEntry.getKey();
- StreamStateHandle remoteFileHandle = oldSstFileEntry.getValue();
-
- readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
- }
-
- Map<String, StreamStateHandle> miscFiles = restoreStateHandle.getMiscFiles();
- for (Map.Entry<String, StreamStateHandle> miscFileEntry : miscFiles.entrySet()) {
- String fileName = miscFileEntry.getKey();
- StreamStateHandle remoteFileHandle = miscFileEntry.getValue();
-
- readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
- }
+ readAllStateData(newSstFiles, restoreInstancePath);
+ readAllStateData(oldSstFiles, restoreInstancePath);
+ readAllStateData(miscFiles, restoreInstancePath);
// read meta data
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots =
@@ -1425,26 +1386,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
throw new IOException("Could not create RocksDB data directory.");
}
- for (String newSstFileName : newSstFiles.keySet()) {
- File restoreFile = new File(restoreInstancePath.getPath(), newSstFileName);
- File targetFile = new File(stateBackend.instanceRocksDBPath, newSstFileName);
-
- Files.createLink(targetFile.toPath(), restoreFile.toPath());
- }
-
- for (String oldSstFileName : oldSstFiles.keySet()) {
- File restoreFile = new File(restoreInstancePath.getPath(), oldSstFileName);
- File targetFile = new File(stateBackend.instanceRocksDBPath, oldSstFileName);
-
- Files.createLink(targetFile.toPath(), restoreFile.toPath());
- }
-
- for (String miscFileName : miscFiles.keySet()) {
- File restoreFile = new File(restoreInstancePath.getPath(), miscFileName);
- File targetFile = new File(stateBackend.instanceRocksDBPath, miscFileName);
-
- Files.createLink(targetFile.toPath(), restoreFile.toPath());
- }
+ createFileHardLinksInRestorePath(newSstFiles, restoreInstancePath);
+ createFileHardLinksInRestorePath(oldSstFiles, restoreInstancePath);
+ createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
stateBackend.db = stateBackend.openDB(
@@ -1470,7 +1414,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// use the restore sst files as the base for succeeding checkpoints
- Map<String, StreamStateHandle> sstFiles = new HashMap<>();
+ Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
sstFiles.putAll(newSstFiles);
sstFiles.putAll(oldSstFiles);
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles);
@@ -1485,6 +1429,29 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
+ private void readAllStateData(
+ Map<StateHandleID, StreamStateHandle> stateHandleMap,
+ Path restoreInstancePath) throws IOException {
+
+ for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) {
+ StateHandleID stateHandleID = entry.getKey();
+ StreamStateHandle remoteFileHandle = entry.getValue();
+ readStateData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
+ }
+ }
+
+ private void createFileHardLinksInRestorePath(
+ Map<StateHandleID, StreamStateHandle> stateHandleMap,
+ Path restoreInstancePath) throws IOException {
+
+ for (StateHandleID stateHandleID : stateHandleMap.keySet()) {
+ String newSstFileName = stateHandleID.toString();
+ File restoreFile = new File(restoreInstancePath.getPath(), newSstFileName);
+ File targetFile = new File(stateBackend.instanceRocksDBPath, newSstFileName);
+ Files.createLink(targetFile.toPath(), restoreFile.toPath());
+ }
+ }
+
void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
boolean hasExtraKeys = (restoreStateHandles.size() > 1 ||
@@ -1496,13 +1463,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
- if (! (rawStateHandle instanceof RocksDBIncrementalKeyedStateHandle)) {
+ if (! (rawStateHandle instanceof IncrementalKeyedStateHandle)) {
throw new IllegalStateException("Unexpected state handle type, " +
- "expected " + RocksDBIncrementalKeyedStateHandle.class +
+ "expected " + IncrementalKeyedStateHandle.class +
", but found " + rawStateHandle.getClass());
}
- RocksDBIncrementalKeyedStateHandle keyedStateHandle = (RocksDBIncrementalKeyedStateHandle) rawStateHandle;
+ IncrementalKeyedStateHandle keyedStateHandle = (IncrementalKeyedStateHandle) rawStateHandle;
restoreInstance(keyedStateHandle, hasExtraKeys);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-core/src/main/java/org/apache/flink/util/StringBasedID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringBasedID.java b/flink-core/src/main/java/org/apache/flink/util/StringBasedID.java
new file mode 100644
index 0000000..7245e61
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/StringBasedID.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.Serializable;
+
+/**
+ * Base class for typed IDs that are internally represented by a string. This class is not intended
+ * for direct use, but should be subclassed for type-safety.
+ */
+public class StringBasedID implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Uses a String as internal representation
+ */
+ private final String keyString;
+
+ /**
+ * Protected constructor to enfore that subclassing.
+ */
+ protected StringBasedID(String keyString) {
+ this.keyString = Preconditions.checkNotNull(keyString);
+ }
+
+ public String getKeyString() {
+ return keyString;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ StringBasedID that = (StringBasedID) o;
+ return keyString.equals(that.keyString);
+ }
+
+ @Override
+ public int hashCode() {
+ return keyString.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return keyString;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index 1b5f2c6..b71418b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -20,14 +20,17 @@ package org.apache.flink.runtime.checkpoint.savepoint;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -71,6 +74,8 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
private static final byte FILE_STREAM_STATE_HANDLE = 2;
private static final byte KEY_GROUPS_HANDLE = 3;
private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
+ private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
+ private static final byte PLACEHOLDER_STREAM_STATE_HANDLE = 6;
/** The singleton instance of the serializer */
public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer();
@@ -287,7 +292,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
KeyedStateHandle keyedStateStream = deserializeKeyedStateHandle(dis);
-
return new OperatorSubtaskState(
nonPartitionableState,
operatorStateBackend,
@@ -311,19 +315,63 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
}
serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
+ } else if (stateHandle instanceof IncrementalKeyedStateHandle) {
+ IncrementalKeyedStateHandle incrementalKeyedStateHandle =
+ (IncrementalKeyedStateHandle) stateHandle;
+
+ dos.writeByte(INCREMENTAL_KEY_GROUPS_HANDLE);
+
+ dos.writeLong(incrementalKeyedStateHandle.getCheckpointId());
+ dos.writeUTF(incrementalKeyedStateHandle.getOperatorIdentifier());
+ dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getStartKeyGroup());
+ dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
+
+ serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), dos);
+
+ serializeStreamStateHandleMap(incrementalKeyedStateHandle.getCreatedSharedState(), dos);
+ serializeStreamStateHandleMap(incrementalKeyedStateHandle.getReferencedSharedState(), dos);
+ serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), dos);
} else {
throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
}
}
+ private static void serializeStreamStateHandleMap(
+ Map<StateHandleID, StreamStateHandle> map,
+ DataOutputStream dos) throws IOException {
+ dos.writeInt(map.size());
+ for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) {
+ dos.writeUTF(entry.getKey().toString());
+ serializeStreamStateHandle(entry.getValue(), dos);
+ }
+ }
+
+ private static Map<StateHandleID, StreamStateHandle> deserializeStreamStateHandleMap(
+ DataInputStream dis) throws IOException {
+
+ final int size = dis.readInt();
+ Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
+
+ for (int i = 0; i < size; ++i) {
+ StateHandleID stateHandleID = new StateHandleID(dis.readUTF());
+ StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
+ result.put(stateHandleID, stateHandle);
+ }
+
+ return result;
+ }
+
private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
final int type = dis.readByte();
if (NULL_HANDLE == type) {
+
return null;
} else if (KEY_GROUPS_HANDLE == type) {
+
int startKeyGroup = dis.readInt();
int numKeyGroups = dis.readInt();
- KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
+ KeyGroupRange keyGroupRange =
+ KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
long[] offsets = new long[numKeyGroups];
for (int i = 0; i < numKeyGroups; ++i) {
offsets[i] = dis.readLong();
@@ -332,6 +380,28 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
keyGroupRange, offsets);
StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+ } else if (INCREMENTAL_KEY_GROUPS_HANDLE == type) {
+
+ long checkpointId = dis.readLong();
+ String operatorId = dis.readUTF();
+ int startKeyGroup = dis.readInt();
+ int numKeyGroups = dis.readInt();
+ KeyGroupRange keyGroupRange =
+ KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
+
+ StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis);
+ Map<StateHandleID, StreamStateHandle> createdStates = deserializeStreamStateHandleMap(dis);
+ Map<StateHandleID, StreamStateHandle> referencedStates = deserializeStreamStateHandleMap(dis);
+ Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis);
+
+ return new IncrementalKeyedStateHandle(
+ operatorId,
+ keyGroupRange,
+ checkpointId,
+ createdStates,
+ referencedStates,
+ privateStates,
+ metaDataStateHandle);
} else {
throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type);
}
@@ -415,7 +485,10 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
byte[] internalData = byteStreamStateHandle.getData();
dos.writeInt(internalData.length);
dos.write(byteStreamStateHandle.getData());
-
+ } else if (stateHandle instanceof PlaceholderStreamStateHandle) {
+ PlaceholderStreamStateHandle placeholder = (PlaceholderStreamStateHandle) stateHandle;
+ dos.writeByte(PLACEHOLDER_STREAM_STATE_HANDLE);
+ dos.writeLong(placeholder.getStateSize());
} else {
throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
}
@@ -437,6 +510,8 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
byte[] data = new byte[numBytes];
dis.readFully(data);
return new ByteStreamStateHandle(handleName, data);
+ } else if (PLACEHOLDER_STREAM_STATE_HANDLE == type) {
+ return new PlaceholderStreamStateHandle(dis.readLong());
} else {
throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
new file mode 100644
index 0000000..706e219
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * The handle to states of an incremental snapshot.
+ * <p>
+ * The states contained in an incremental snapshot include
+ * <ul>
+ * <li> Created shared state which includes (the supposed to be) shared files produced since the last
+ * completed checkpoint. These files can be referenced by succeeding checkpoints if the
+ * checkpoint succeeds to complete. </li>
+ * <li> Referenced shared state which includes the shared files materialized in previous
+ * checkpoints. </li>
+ * <li> Private state which includes all other files, typically mutable, that cannot be shared by
+ * other checkpoints. </li>
+ * <li> Backend meta state which includes the information of existing states. </li>
+ * </ul>
+ *
+ * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They
+ * should not be called from production code. This means this class is also not suited to serve as
+ * a key, e.g. in hash maps.
+ */
+public class IncrementalKeyedStateHandle implements KeyedStateHandle {
+
+ private static final Logger LOG = LoggerFactory.getLogger(IncrementalKeyedStateHandle.class);
+
+ private static final long serialVersionUID = -8328808513197388231L;
+
+ /**
+ * The operator instance identifier for this handle
+ */
+ private final String operatorIdentifier;
+
+ /**
+ * The key-group range covered by this state handle
+ */
+ private final KeyGroupRange keyGroupRange;
+
+ /**
+ * The checkpoint Id
+ */
+ private final long checkpointId;
+
+ /**
+ * State that the incremental checkpoint created new
+ */
+ private final Map<StateHandleID, StreamStateHandle> createdSharedState;
+
+ /**
+ * State that the incremental checkpoint references from previous checkpoints
+ */
+ private final Map<StateHandleID, StreamStateHandle> referencedSharedState;
+
+ /**
+ * Private state in the incremental checkpoint
+ */
+ private final Map<StateHandleID, StreamStateHandle> privateState;
+
+ /**
+ * Primary meta data state of the incremental checkpoint
+ */
+ private final StreamStateHandle metaStateHandle;
+
+ /**
+ * True if the state handle has already registered shared states.
+ * <p>
+ * Once the shared states are registered, it's the {@link SharedStateRegistry}'s
+ * responsibility to maintain the shared states. But in the cases where the
+ * state handle is discarded before performing the registration, the handle
+ * should delete all the shared states created by it.
+ */
+ private boolean registered;
+
+ public IncrementalKeyedStateHandle(
+ String operatorIdentifier,
+ KeyGroupRange keyGroupRange,
+ long checkpointId,
+ Map<StateHandleID, StreamStateHandle> createdSharedState,
+ Map<StateHandleID, StreamStateHandle> referencedSharedState,
+ Map<StateHandleID, StreamStateHandle> privateState,
+ StreamStateHandle metaStateHandle) {
+
+ this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
+ this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
+ this.checkpointId = checkpointId;
+ this.createdSharedState = Preconditions.checkNotNull(createdSharedState);
+ this.referencedSharedState = Preconditions.checkNotNull(referencedSharedState);
+ this.privateState = Preconditions.checkNotNull(privateState);
+ this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle);
+ this.registered = false;
+ }
+
+ @Override
+ public KeyGroupRange getKeyGroupRange() {
+ return keyGroupRange;
+ }
+
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ public Map<StateHandleID, StreamStateHandle> getCreatedSharedState() {
+ return createdSharedState;
+ }
+
+ public Map<StateHandleID, StreamStateHandle> getReferencedSharedState() {
+ return referencedSharedState;
+ }
+
+ public Map<StateHandleID, StreamStateHandle> getPrivateState() {
+ return privateState;
+ }
+
+ public StreamStateHandle getMetaStateHandle() {
+ return metaStateHandle;
+ }
+
+ public String getOperatorIdentifier() {
+ return operatorIdentifier;
+ }
+
+ @Override
+ public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+ if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
+ return this;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void discardState() throws Exception {
+
+ Preconditions.checkState(!registered, "Attempt to dispose a registered composite state with registered shared state. Must unregister first.");
+
+ try {
+ metaStateHandle.discardState();
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard meta data.", e);
+ }
+
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard misc file states.", e);
+ }
+
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(createdSharedState.values());
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard new sst file states.", e);
+ }
+
+ }
+
+ @Override
+ public long getStateSize() {
+ long size = getPrivateStateSize();
+
+ for (StreamStateHandle oldSstFileHandle : referencedSharedState.values()) {
+ size += oldSstFileHandle.getStateSize();
+ }
+
+ return size;
+ }
+
+ /**
+ * Returns the size of the state that is privately owned by this handle.
+ */
+ public long getPrivateStateSize() {
+ long size = StateUtil.getStateSize(metaStateHandle);
+
+ for (StreamStateHandle newSstFileHandle : createdSharedState.values()) {
+ size += newSstFileHandle.getStateSize();
+ }
+
+ for (StreamStateHandle miscFileHandle : privateState.values()) {
+ size += miscFileHandle.getStateSize();
+ }
+
+ return size;
+ }
+
+ @Override
+ public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
+ Preconditions.checkState(!registered, "The state handle has already registered its shared states.");
+
+ for (Map.Entry<StateHandleID, StreamStateHandle> newSstFileEntry : createdSharedState.entrySet()) {
+ SharedStateRegistryKey registryKey =
+ createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
+
+ SharedStateRegistry.Result result =
+ stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue());
+
+ // We update our reference with the result from the registry, to prevent the following
+ // problem:
+ // A previous checkpoint n has already registered the state. This can happen if a
+ // following checkpoint (n + x) wants to reference the same state before the backend got
+ // notified that checkpoint n completed. In this case, the shared registry did
+ // deduplication and returns the previous reference.
+ newSstFileEntry.setValue(result.getReference());
+ }
+
+ for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileName : referencedSharedState.entrySet()) {
+ SharedStateRegistryKey registryKey =
+ createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
+
+ SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey);
+
+ // Again we update our state handle with the result from the registry, thus replacing
+ // placeholder state handles with the originals.
+ oldSstFileName.setValue(result.getReference());
+ }
+
+ // Migrate state from unregistered to registered, so that it will not count as private state
+ // for #discardState() from now.
+ referencedSharedState.putAll(createdSharedState);
+ createdSharedState.clear();
+
+ registered = true;
+ }
+
+ @Override
+ public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
+
+ Preconditions.checkState(registered, "The state handle has not registered its shared states yet.");
+
+ for (Map.Entry<StateHandleID, StreamStateHandle> newSstFileEntry : createdSharedState.entrySet()) {
+ SharedStateRegistryKey registryKey =
+ createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
+ stateRegistry.releaseReference(registryKey);
+ }
+
+ for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileEntry : referencedSharedState.entrySet()) {
+ SharedStateRegistryKey registryKey =
+ createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey());
+ stateRegistry.releaseReference(registryKey);
+ }
+
+ registered = false;
+ }
+
+ private SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+ return new SharedStateRegistryKey(operatorIdentifier + '-' + keyGroupRange, shId);
+ }
+
+ /**
+ * This method is should only be called in tests! This should never serve as key in a hash map.
+ */
+ @VisibleForTesting
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ IncrementalKeyedStateHandle that = (IncrementalKeyedStateHandle) o;
+
+ if (getCheckpointId() != that.getCheckpointId()) {
+ return false;
+ }
+ if (!getOperatorIdentifier().equals(that.getOperatorIdentifier())) {
+ return false;
+ }
+ if (!getKeyGroupRange().equals(that.getKeyGroupRange())) {
+ return false;
+ }
+ if (!getCreatedSharedState().equals(that.getCreatedSharedState())) {
+ return false;
+ }
+ if (!getReferencedSharedState().equals(that.getReferencedSharedState())) {
+ return false;
+ }
+ if (!getPrivateState().equals(that.getPrivateState())) {
+ return false;
+ }
+ return getMetaStateHandle().equals(that.getMetaStateHandle());
+ }
+
+ /**
+ * This method should only be called in tests! This should never serve as key in a hash map.
+ */
+ @VisibleForTesting
+ @Override
+ public int hashCode() {
+ int result = getOperatorIdentifier().hashCode();
+ result = 31 * result + getKeyGroupRange().hashCode();
+ result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32));
+ result = 31 * result + getCreatedSharedState().hashCode();
+ result = 31 * result + getReferencedSharedState().hashCode();
+ result = 31 * result + getPrivateState().hashCode();
+ result = 31 * result + getMetaStateHandle().hashCode();
+ return result;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
new file mode 100644
index 0000000..2136061
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+/**
+ * A placeholder state handle for shared state that will replaced by an original that was
+ * created in a previous checkpoint. So we don't have to send the handle twice, e.g. in
+ * case of {@link ByteStreamStateHandle}. To be used in the referenced states of
+ * {@link IncrementalKeyedStateHandle}.
+ * <p>
+ * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They
+ * should not be called from production code. This means this class is also not suited to serve as
+ * a key, e.g. in hash maps.
+ */
+public class PlaceholderStreamStateHandle implements StreamStateHandle {
+
+ private static final long serialVersionUID = 1L;
+
+ /** We remember the size of the original file for which this is a placeholder */
+ private final long originalSize;
+
+ public PlaceholderStreamStateHandle(long originalSize) {
+ this.originalSize = originalSize;
+ }
+
+ @Override
+ public FSDataInputStream openInputStream() {
+ throw new UnsupportedOperationException(
+ "This is only a placeholder to be replaced by a real StreamStateHandle in the checkpoint coordinator.");
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ // nothing to do.
+ }
+
+ @Override
+ public long getStateSize() {
+ return originalSize;
+ }
+
+ /**
+ * This method is should only be called in tests! This should never serve as key in a hash map.
+ */
+ @VisibleForTesting
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ PlaceholderStreamStateHandle that = (PlaceholderStreamStateHandle) o;
+
+ return originalSize == that.originalSize;
+ }
+
+ /**
+ * This method is should only be called in tests! This should never serve as key in a hash map.
+ */
+ @VisibleForTesting
+ @Override
+ public int hashCode() {
+ return (int) (originalSize ^ (originalSize >>> 32));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateHandle.java
deleted file mode 100644
index c8c4046..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateHandle.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-/**
- * A handle to those states that are referenced by different checkpoints.
- *
- * <p> Each shared state handle is identified by a unique key. Two shared states
- * are considered equal if their keys are identical.
- *
- * <p> All shared states are registered at the {@link SharedStateRegistry} once
- * they are received by the {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
- * and will be unregistered when the checkpoints are discarded. A shared state
- * will be discarded once it is not referenced by any checkpoint. A shared state
- * should not be referenced any more if it has been discarded.
- */
-public interface SharedStateHandle extends StateObject {
-
- /**
- * Return the identifier of the shared state.
- */
- String getRegistrationKey();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index dbf4642..9cfdec7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -31,7 +31,7 @@ import java.util.concurrent.Executor;
/**
* A {@code SharedStateRegistry} will be deployed in the
* {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
- * maintain the reference count of {@link SharedStateHandle}s which are shared
+ * maintain the reference count of {@link StreamStateHandle}s which are shared
* among different incremental checkpoints.
*/
public class SharedStateRegistry {
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
index 9e59359..58262ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
@@ -18,9 +18,8 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringBasedID;
/**
* This class represents a key that uniquely identifies (on a logical level) state handles for
@@ -28,41 +27,16 @@ import java.io.Serializable;
* be the same should have the same {@link SharedStateRegistryKey}. The meaning of logical
* equivalence is up to the application.
*/
-public class SharedStateRegistryKey implements Serializable {
+public class SharedStateRegistryKey extends StringBasedID {
private static final long serialVersionUID = 1L;
- /** Uses a String as internal representation */
- private final String keyString;
-
- public SharedStateRegistryKey(String keyString) {
- this.keyString = Preconditions.checkNotNull(keyString);
- }
-
- public String getKeyString() {
- return keyString;
+ public SharedStateRegistryKey(String prefix, StateHandleID stateHandleID) {
+ super(prefix + '-' + stateHandleID);
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- SharedStateRegistryKey that = (SharedStateRegistryKey) o;
- return keyString.equals(that.keyString);
- }
-
- @Override
- public int hashCode() {
- return keyString.hashCode();
- }
-
- @Override
- public String toString() {
- return keyString;
+ @VisibleForTesting
+ public SharedStateRegistryKey(String keyString) {
+ super(keyString);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
deleted file mode 100644
index b736252..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-/**
- * StateHandle is a general handle interface meant to abstract operator state fetching.
- * A StateHandle implementation can for example include the state itself in cases where the state
- * is lightweight or fetching it lazily from some external storage when the state is too large.
- */
-public interface StateHandle<T> extends StateObject {
-
- /**
- * This retrieves and return the state represented by the handle.
- *
- * @param userCodeClassLoader Class loader for deserializing user code specific classes
- *
- * @return The state represented by the handle.
- * @throws java.lang.Exception Thrown, if the state cannot be fetched.
- */
- T getState(ClassLoader userCodeClassLoader) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleID.java
new file mode 100644
index 0000000..5e95cff
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleID.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.StringBasedID;
+
+/**
+ * Unique ID that allows for logical comparison between state handles.
+ * <p>
+ * Two state handles that are considered as logically equal should always return the same ID
+ * (whatever logically equal means is up to the implementation). For example, this could be based
+ * on the string representation of the full filepath for a state that is based on a file.
+ */
+public class StateHandleID extends StringBasedID {
+
+ private static final long serialVersionUID = 1L;
+
+ public StateHandleID(String keyString) {
+ super(keyString);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
index ba77dbc..b63782d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -27,10 +27,15 @@ import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle.StateMetaInfo;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
import org.apache.flink.util.StringUtils;
@@ -41,6 +46,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.UUID;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -78,6 +84,7 @@ public class CheckpointTestUtils {
boolean hasKeyedBackend = random.nextInt(4) != 0;
boolean hasKeyedStream = random.nextInt(4) != 0;
+ boolean isIncremental = random.nextInt(3) == 0;
for (int subtaskIdx = 0; subtaskIdx < numSubtasksPerTask; subtaskIdx++) {
@@ -108,21 +115,19 @@ public class CheckpointTestUtils {
operatorStateHandleStream = new OperatorStateHandle(offsetsMap, operatorStateStream);
}
- KeyGroupsStateHandle keyedStateBackend = null;
- KeyGroupsStateHandle keyedStateStream = null;
+ KeyedStateHandle keyedStateBackend = null;
+ KeyedStateHandle keyedStateStream = null;
if (hasKeyedBackend) {
- keyedStateBackend = new KeyGroupsStateHandle(
- new KeyGroupRangeOffsets(1, 1, new long[]{42}),
- new TestByteStreamStateHandleDeepCompare("c", "Hello"
- .getBytes(ConfigConstants.DEFAULT_CHARSET)));
+ if (isIncremental) {
+ keyedStateBackend = createDummyIncrementalKeyedStateHandle(random);
+ } else {
+ keyedStateBackend = createDummyKeyGroupStateHandle(random);
+ }
}
if (hasKeyedStream) {
- keyedStateStream = new KeyGroupsStateHandle(
- new KeyGroupRangeOffsets(1, 1, new long[]{23}),
- new TestByteStreamStateHandleDeepCompare("d", "World"
- .getBytes(ConfigConstants.DEFAULT_CHARSET)));
+ keyedStateStream = createDummyKeyGroupStateHandle(random);
}
taskState.putState(subtaskIdx, new OperatorSubtaskState(
@@ -210,17 +215,11 @@ public class CheckpointTestUtils {
KeyGroupsStateHandle keyedStateStream = null;
if (hasKeyedBackend) {
- keyedStateBackend = new KeyGroupsStateHandle(
- new KeyGroupRangeOffsets(1, 1, new long[]{42}),
- new TestByteStreamStateHandleDeepCompare("c", "Hello"
- .getBytes(ConfigConstants.DEFAULT_CHARSET)));
+ keyedStateBackend = createDummyKeyGroupStateHandle(random);
}
if (hasKeyedStream) {
- keyedStateStream = new KeyGroupsStateHandle(
- new KeyGroupRangeOffsets(1, 1, new long[]{23}),
- new TestByteStreamStateHandleDeepCompare("d", "World"
- .getBytes(ConfigConstants.DEFAULT_CHARSET)));
+ keyedStateStream = createDummyKeyGroupStateHandle(random);
}
taskState.putState(subtaskIdx, new SubtaskState(
@@ -272,4 +271,56 @@ public class CheckpointTestUtils {
/** utility class, not meant to be instantiated */
private CheckpointTestUtils() {}
+
+
+ private static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) {
+ return new IncrementalKeyedStateHandle(
+ createRandomUUID(rnd).toString(),
+ new KeyGroupRange(1, 1),
+ 42L,
+ createRandomOwnedHandleMap(rnd),
+ createRandomReferencedHandleMap(rnd),
+ createRandomOwnedHandleMap(rnd),
+ createDummyStreamStateHandle(rnd));
+ }
+
+ private static Map<StateHandleID, StreamStateHandle> createRandomOwnedHandleMap(Random rnd) {
+ final int size = rnd.nextInt(4);
+ Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
+ for (int i = 0; i < size; ++i) {
+ StateHandleID randomId = new StateHandleID(createRandomUUID(rnd).toString());
+ StreamStateHandle stateHandle = createDummyStreamStateHandle(rnd);
+ result.put(randomId, stateHandle);
+ }
+
+ return result;
+ }
+
+ private static Map<StateHandleID, StreamStateHandle> createRandomReferencedHandleMap(Random rnd) {
+ final int size = rnd.nextInt(4);
+ Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
+ for (int i = 0; i < size; ++i) {
+ StateHandleID randomId = new StateHandleID(createRandomUUID(rnd).toString());
+ result.put(randomId, new PlaceholderStreamStateHandle(rnd.nextInt(1024)));
+ }
+
+ return result;
+ }
+
+ private static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd) {
+ return new KeyGroupsStateHandle(
+ new KeyGroupRangeOffsets(1, 1, new long[]{rnd.nextInt(1024)}),
+ createDummyStreamStateHandle(rnd));
+ }
+
+ private static StreamStateHandle createDummyStreamStateHandle(Random rnd) {
+ return new TestByteStreamStateHandleDeepCompare(
+ String.valueOf(createRandomUUID(rnd)),
+ String.valueOf(createRandomUUID(rnd)).getBytes(ConfigConstants.DEFAULT_CHARSET));
+ }
+
+ private static UUID createRandomUUID(Random rnd) {
+ return new UUID(rnd.nextLong(), rnd.nextLong());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
index 154d761..602390b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
-
import org.junit.Test;
import java.io.DataInputStream;
http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
deleted file mode 100644
index d6966d0..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.util.concurrent.RunnableFuture;
-
-public class StateUtilTest extends TestLogger {
-
- /**
- * Tests that {@link StateUtil#discardStateFuture} can handle state futures with null value.
- */
- @Test
- public void testDiscardRunnableFutureWithNullValue() throws Exception {
- RunnableFuture<StateHandle<?>> stateFuture = DoneFuture.nullValue();
- StateUtil.discardStateFuture(stateFuture);
- }
-}
[2/5] flink git commit: [FLINK-6527] [checkpoint]
OperatorSubtaskState has empty implementations of (un)/registerSharedStates
Posted by sr...@apache.org.
[FLINK-6527] [checkpoint] OperatorSubtaskState has empty implementations of (un)/registerSharedStates
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efbb41bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efbb41bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efbb41bc
Branch: refs/heads/master
Commit: efbb41bc633e6c72037b9dfd311b23693335844e
Parents: 958773b
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed May 10 14:57:55 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 13:49:50 2017 +0200
----------------------------------------------------------------------
.../runtime/checkpoint/OperatorSubtaskState.java | 16 ++++++++++++++--
1 file changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/efbb41bc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index 863816a..49ef863 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -148,12 +148,24 @@ public class OperatorSubtaskState implements CompositeStateHandle {
@Override
public void registerSharedStates(SharedStateRegistry sharedStateRegistry) {
- // No shared states
+ if (managedKeyedState != null) {
+ managedKeyedState.registerSharedStates(sharedStateRegistry);
+ }
+
+ if (rawKeyedState != null) {
+ rawKeyedState.registerSharedStates(sharedStateRegistry);
+ }
}
@Override
public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
- // No shared states
+ if (managedKeyedState != null) {
+ managedKeyedState.unregisterSharedStates(sharedStateRegistry);
+ }
+
+ if (rawKeyedState != null) {
+ rawKeyedState.unregisterSharedStates(sharedStateRegistry);
+ }
}
@Override
[5/5] flink git commit: [FLINK-6504] [checkpoint] Fix synchronization
on materializedSstFiles in RocksDBKeyedStateBackend
Posted by sr...@apache.org.
[FLINK-6504] [checkpoint] Fix synchronization on materializedSstFiles in RocksDBKeyedStateBackend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/958773b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/958773b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/958773b7
Branch: refs/heads/master
Commit: 958773b71c52aae94560508f8d4cd894059d4467
Parents: 4745d0c
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu May 11 11:59:47 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 13:49:50 2017 +0200
----------------------------------------------------------------------
.../contrib/streaming/state/RocksDBKeyedStateBackend.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/958773b7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 1080e59..b9468f7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -828,9 +828,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
void takeSnapshot() throws Exception {
+ assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
+
// use the last completed checkpoint as the comparison base.
baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+
// save meta data
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
: stateBackend.kvStateInformation.entrySet()) {
@@ -888,7 +891,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
sstFiles.putAll(newSstFiles);
sstFiles.putAll(oldSstFiles);
- stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+ synchronized (stateBackend.asyncSnapshotLock) {
+ stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+ }
return new RocksDBIncrementalKeyedStateHandle(
stateBackend.operatorIdentifier,
[3/5] flink git commit: [FLINK-6537] [checkpoint] First set of fixes
for (de)registration of shared state in incremental checkpoints
Posted by sr...@apache.org.
[FLINK-6537] [checkpoint] First set of fixes for (de)registration of shared state in incremental checkpoints
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4745d0c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4745d0c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4745d0c0
Branch: refs/heads/master
Commit: 4745d0c0822ba1f1c32568d0c4869cb44fa35426
Parents: b54f448
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed May 10 17:59:39 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 13:49:50 2017 +0200
----------------------------------------------------------------------
.../RocksDBIncrementalKeyedStateHandle.java | 123 ++++++------
.../state/RocksDBKeyedStateBackend.java | 64 +++++-
.../runtime/state/SharedStateRegistry.java | 196 ++++++++++++++-----
.../runtime/state/SharedStateRegistryKey.java | 68 +++++++
.../runtime/state/SharedStateRegistryTest.java | 85 +++++---
.../runtime/state/StateBackendTestBase.java | 18 +-
6 files changed, 397 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
index 5ac9e46..961182d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
@@ -18,12 +18,11 @@
package org.apache.flink.contrib.streaming.state;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.state.CompositeStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.SharedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;
@@ -54,17 +53,15 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, Com
private static final long serialVersionUID = -8328808513197388231L;
- private final JobID jobId;
-
private final String operatorIdentifier;
private final KeyGroupRange keyGroupRange;
private final long checkpointId;
- private final Map<String, StreamStateHandle> newSstFiles;
+ private final Map<String, StreamStateHandle> unregisteredSstFiles;
- private final Map<String, StreamStateHandle> oldSstFiles;
+ private final Map<String, StreamStateHandle> registeredSstFiles;
private final Map<String, StreamStateHandle> miscFiles;
@@ -81,21 +78,19 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, Com
private boolean registered;
RocksDBIncrementalKeyedStateHandle(
- JobID jobId,
String operatorIdentifier,
KeyGroupRange keyGroupRange,
long checkpointId,
- Map<String, StreamStateHandle> newSstFiles,
- Map<String, StreamStateHandle> oldSstFiles,
+ Map<String, StreamStateHandle> unregisteredSstFiles,
+ Map<String, StreamStateHandle> registeredSstFiles,
Map<String, StreamStateHandle> miscFiles,
StreamStateHandle metaStateHandle) {
- this.jobId = Preconditions.checkNotNull(jobId);
this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
this.checkpointId = checkpointId;
- this.newSstFiles = Preconditions.checkNotNull(newSstFiles);
- this.oldSstFiles = Preconditions.checkNotNull(oldSstFiles);
+ this.unregisteredSstFiles = Preconditions.checkNotNull(unregisteredSstFiles);
+ this.registeredSstFiles = Preconditions.checkNotNull(registeredSstFiles);
this.miscFiles = Preconditions.checkNotNull(miscFiles);
this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle);
this.registered = false;
@@ -110,12 +105,12 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, Com
return checkpointId;
}
- Map<String, StreamStateHandle> getNewSstFiles() {
- return newSstFiles;
+ Map<String, StreamStateHandle> getUnregisteredSstFiles() {
+ return unregisteredSstFiles;
}
- Map<String, StreamStateHandle> getOldSstFiles() {
- return oldSstFiles;
+ Map<String, StreamStateHandle> getRegisteredSstFiles() {
+ return registeredSstFiles;
}
Map<String, StreamStateHandle> getMiscFiles() {
@@ -138,6 +133,8 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, Com
@Override
public void discardState() throws Exception {
+ Preconditions.checkState(!registered, "Attempt to dispose a registered composite state with registered shared state. Must unregister first.");
+
try {
metaStateHandle.discardState();
} catch (Exception e) {
@@ -150,24 +147,23 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, Com
LOG.warn("Could not properly discard misc file states.", e);
}
- if (!registered) {
- try {
- StateUtil.bestEffortDiscardAllStateObjects(newSstFiles.values());
- } catch (Exception e) {
- LOG.warn("Could not properly discard new sst file states.", e);
- }
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(unregisteredSstFiles.values());
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard new sst file states.", e);
}
+
}
@Override
public long getStateSize() {
long size = StateUtil.getStateSize(metaStateHandle);
- for (StreamStateHandle newSstFileHandle : newSstFiles.values()) {
+ for (StreamStateHandle newSstFileHandle : unregisteredSstFiles.values()) {
size += newSstFileHandle.getStateSize();
}
- for (StreamStateHandle oldSstFileHandle : oldSstFiles.values()) {
+ for (StreamStateHandle oldSstFileHandle : registeredSstFiles.values()) {
size += oldSstFileHandle.getStateSize();
}
@@ -180,69 +176,66 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, Com
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
Preconditions.checkState(!registered, "The state handle has already registered its shared states.");
- for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) {
- SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue());
+ for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) {
+ SharedStateRegistryKey registryKey =
+ createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
- int referenceCount = stateRegistry.register(stateHandle);
- Preconditions.checkState(referenceCount == 1);
+ SharedStateRegistry.Result result =
+ stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue());
+
+ // We update our reference with the result from the registry, to prevent the following
+ // problem:
+ // A previous checkpoint n has already registered the state. This can happen if a
+ // following checkpoint (n + x) wants to reference the same state before the backend got
+ // notified that checkpoint n completed. In this case, the shared registry did
+ // deduplication and returns the previous reference.
+ newSstFileEntry.setValue(result.getReference());
}
- for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) {
- SstFileStateHandle stateHandle = new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue());
+ for (Map.Entry<String, StreamStateHandle> oldSstFileName : registeredSstFiles.entrySet()) {
+ SharedStateRegistryKey registryKey =
+ createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
+
+ SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey);
- int referenceCount = stateRegistry.register(stateHandle);
- Preconditions.checkState(referenceCount > 1);
+ // Again we update our state handle with the result from the registry, thus replacing
+ // placeholder state handles with the originals.
+ oldSstFileName.setValue(result.getReference());
}
+ // Migrate state from unregistered to registered, so that it will not count as private state
+ // for #discardState() from now.
+ registeredSstFiles.putAll(unregisteredSstFiles);
+ unregisteredSstFiles.clear();
+
registered = true;
}
@Override
public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
+
Preconditions.checkState(registered, "The state handle has not registered its shared states yet.");
- for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) {
- stateRegistry.unregister(new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()));
+ for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) {
+ SharedStateRegistryKey registryKey =
+ createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
+ stateRegistry.releaseReference(registryKey);
}
- for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) {
- stateRegistry.unregister(new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue()));
+ for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : registeredSstFiles.entrySet()) {
+ SharedStateRegistryKey registryKey =
+ createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey());
+ stateRegistry.releaseReference(registryKey);
}
registered = false;
}
- private class SstFileStateHandle implements SharedStateHandle {
-
- private static final long serialVersionUID = 9092049285789170669L;
-
- private final String fileName;
-
- private final StreamStateHandle delegateStateHandle;
-
- private SstFileStateHandle(
- String fileName,
- StreamStateHandle delegateStateHandle) {
- this.fileName = fileName;
- this.delegateStateHandle = delegateStateHandle;
- }
-
- @Override
- public String getRegistrationKey() {
- return jobId + "-" + operatorIdentifier + "-" + keyGroupRange + "-" + fileName;
- }
-
- @Override
- public void discardState() throws Exception {
- delegateStateHandle.discardState();
- }
-
- @Override
- public long getStateSize() {
- return delegateStateHandle.getStateSize();
- }
+ private SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(String fileName) {
+ return new SharedStateRegistryKey(operatorIdentifier + "-" + keyGroupRange + "-" + fileName);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 6af53c3..1080e59 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -54,7 +54,6 @@ import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.StateMigrationUtil;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -62,9 +61,10 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateMigrationUtil;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -72,6 +72,7 @@ import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
@@ -709,16 +710,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static final class RocksDBIncrementalSnapshotOperation {
+ /** The backend which we snapshot */
private final RocksDBKeyedStateBackend<?> stateBackend;
+ /** Stream factory that creates the outpus streams to DFS */
private final CheckpointStreamFactory checkpointStreamFactory;
+ /** Id for the current checkpoint */
private final long checkpointId;
+ /** Timestamp for the current checkpoint */
private final long checkpointTimestamp;
+ /** All sst files that were part of the last previously completed checkpoint */
private Map<String, StreamStateHandle> baseSstFiles;
+ /** The state meta data */
private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>();
private FileSystem backupFileSystem;
@@ -864,10 +871,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
if (fileHandle == null) {
fileHandle = materializeStateData(filePath);
-
newSstFiles.put(fileName, fileHandle);
} else {
- oldSstFiles.put(fileName, fileHandle);
+ // we introduce a placeholder state handle, that is replaced with the
+ // original from the shared state registry (created from a previous checkpoint)
+ oldSstFiles.put(fileName, new PlaceholderStreamStateHandle(fileHandle.getStateSize()));
}
} else {
StreamStateHandle fileHandle = materializeStateData(filePath);
@@ -882,9 +890,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
- return new RocksDBIncrementalKeyedStateHandle(stateBackend.jobId,
- stateBackend.operatorIdentifier, stateBackend.keyGroupRange,
- checkpointId, newSstFiles, oldSstFiles, miscFiles, metaStateHandle);
+ return new RocksDBIncrementalKeyedStateHandle(
+ stateBackend.operatorIdentifier,
+ stateBackend.keyGroupRange,
+ checkpointId,
+ newSstFiles,
+ oldSstFiles,
+ miscFiles,
+ metaStateHandle);
}
void stop() {
@@ -922,6 +935,39 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
}
+
+ /**
+ * A placeholder state handle for shared state that will replaced by an original that was
+ * created in a previous checkpoint. So we don't have to send the handle twice, e.g. in
+ * case of {@link ByteStreamStateHandle}.
+ */
+ private static final class PlaceholderStreamStateHandle implements StreamStateHandle {
+
+ private static final long serialVersionUID = 1L;
+
+ /** We remember the size of the original file for which this is a placeholder */
+ private final long originalSize;
+
+ public PlaceholderStreamStateHandle(long originalSize) {
+ this.originalSize = originalSize;
+ }
+
+ @Override
+ public FSDataInputStream openInputStream() {
+ throw new UnsupportedOperationException(
+ "This is only a placeholder to be replaced by a real StreamStateHandle in the checkpoint coordinator.");
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ // nothing to do.
+ }
+
+ @Override
+ public long getStateSize() {
+ return originalSize;
+ }
+ }
}
@Override
@@ -1260,7 +1306,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
UUID.randomUUID().toString());
try {
- Map<String, StreamStateHandle> newSstFiles = restoreStateHandle.getNewSstFiles();
+ Map<String, StreamStateHandle> newSstFiles = restoreStateHandle.getUnregisteredSstFiles();
for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) {
String fileName = newSstFileEntry.getKey();
StreamStateHandle remoteFileHandle = newSstFileEntry.getValue();
@@ -1268,7 +1314,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
}
- Map<String, StreamStateHandle> oldSstFiles = restoreStateHandle.getOldSstFiles();
+ Map<String, StreamStateHandle> oldSstFiles = restoreStateHandle.getRegisteredSstFiles();
for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) {
String fileName = oldSstFileEntry.getKey();
StreamStateHandle remoteFileHandle = oldSstFileEntry.getValue();
http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 2cb43ac..dbf4642 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -18,91 +18,137 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executor;
/**
* A {@code SharedStateRegistry} will be deployed in the
- * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to
+ * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
* maintain the reference count of {@link SharedStateHandle}s which are shared
- * among different checkpoints.
- *
+ * among different incremental checkpoints.
*/
public class SharedStateRegistry {
private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class);
/** All registered state objects by an artificial key */
- private final Map<String, SharedStateRegistry.SharedStateEntry> registeredStates;
+ private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates;
+
+ /** Executor for async state deletion */
+ private final Executor asyncDisposalExecutor;
public SharedStateRegistry() {
this.registeredStates = new HashMap<>();
+ this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534
}
/**
- * Register a reference to the given shared state in the registry. This increases the reference
- * count for the this shared state by one. Returns the reference count after the update.
+ * Register a reference to the given (supposedly new) shared state in the registry.
+ * This does the following: We check if the state handle is actually new by the
+ * registrationKey. If it is new, we register it with a reference count of 1. If there is
+ * already a state handle registered under the given key, we dispose the given "new" state
+ * handle, uptick the reference count of the previously existing state handle and return it as
+ * a replacement with the result.
+ *
+ * <p>IMPORTANT: caller should check the state handle returned by the result, because the
+ * registry is performing deduplication and could potentially return a handle that is supposed
+ * to replace the one from the registration request.
*
* @param state the shared state for which we register a reference.
- * @return the updated reference count for the given shared state.
+ * @return the result of this registration request, consisting of the state handle that is
+ * registered under the key by the end of the oepration and its current reference count.
*/
- public int register(SharedStateHandle state) {
- if (state == null) {
- return 0;
- }
+ public Result registerNewReference(SharedStateRegistryKey registrationKey, StreamStateHandle state) {
+
+ Preconditions.checkNotNull(state);
+
+ StreamStateHandle scheduledStateDeletion = null;
+ SharedStateRegistry.SharedStateEntry entry;
synchronized (registeredStates) {
- SharedStateRegistry.SharedStateEntry entry =
- registeredStates.get(state.getRegistrationKey());
+ entry = registeredStates.get(registrationKey);
if (entry == null) {
- SharedStateRegistry.SharedStateEntry stateEntry =
- new SharedStateRegistry.SharedStateEntry(state);
- registeredStates.put(state.getRegistrationKey(), stateEntry);
- return 1;
+ entry = new SharedStateRegistry.SharedStateEntry(state);
+ registeredStates.put(registrationKey, entry);
} else {
+ // delete if this is a real duplicate
+ if (!Objects.equals(state, entry.state)) {
+ scheduledStateDeletion = state;
+ }
entry.increaseReferenceCount();
- return entry.getReferenceCount();
}
}
+
+ scheduleAsyncDelete(scheduledStateDeletion);
+ return new Result(entry);
}
/**
- * Unregister one reference to the given shared state in the registry. This decreases the
- * reference count by one. Once the count reaches zero, the shared state is deleted.
+ * Obtains one reference to the given shared state in the registry. This increases the
+ * reference count by one.
*
- * @param state the shared state for which we unregister a reference.
- * @return the reference count for the shared state after the update.
+ * @param registrationKey the shared state for which we obtain a reference.
+ * @return the shared state for which we release a reference.
+ * @return the result of the request, consisting of the reference count after this operation
+ * and the state handle.
*/
- public int unregister(SharedStateHandle state) {
- if (state == null) {
- return 0;
+ public Result obtainReference(SharedStateRegistryKey registrationKey) {
+
+ Preconditions.checkNotNull(registrationKey);
+
+ synchronized (registeredStates) {
+ SharedStateRegistry.SharedStateEntry entry =
+ Preconditions.checkNotNull(registeredStates.get(registrationKey),
+ "Could not find a state for the given registration key!");
+ entry.increaseReferenceCount();
+ return new Result(entry);
}
+ }
+
+ /**
+ * Releases one reference to the given shared state in the registry. This decreases the
+ * reference count by one. Once the count reaches zero, the shared state is deleted.
+ *
+ * @param registrationKey the shared state for which we release a reference.
+ * @return the result of the request, consisting of the reference count after this operation
+ * and the state handle, or null if the state handle was deleted through this request.
+ */
+ public Result releaseReference(SharedStateRegistryKey registrationKey) {
+
+ Preconditions.checkNotNull(registrationKey);
+
+ final Result result;
+ final StreamStateHandle scheduledStateDeletion;
synchronized (registeredStates) {
- SharedStateRegistry.SharedStateEntry entry = registeredStates.get(state.getRegistrationKey());
+ SharedStateRegistry.SharedStateEntry entry = registeredStates.get(registrationKey);
- Preconditions.checkState(entry != null, "Cannot unregister a state that is not registered.");
+ Preconditions.checkState(entry != null,
+ "Cannot unregister a state that is not registered.");
entry.decreaseReferenceCount();
- final int newReferenceCount = entry.getReferenceCount();
-
// Remove the state from the registry when it's not referenced any more.
- if (newReferenceCount <= 0) {
- registeredStates.remove(state.getRegistrationKey());
- try {
- entry.getState().discardState();
- } catch (Exception e) {
- LOG.warn("Cannot properly discard the state {}.", entry.getState(), e);
- }
+ if (entry.getReferenceCount() <= 0) {
+ registeredStates.remove(registrationKey);
+ scheduledStateDeletion = entry.getState();
+ result = new Result(null, 0);
+ } else {
+ scheduledStateDeletion = null;
+ result = new Result(entry);
}
- return newReferenceCount;
}
+
+ scheduleAsyncDelete(scheduledStateDeletion);
+ return result;
}
/**
@@ -122,8 +168,6 @@ public class SharedStateRegistry {
}
}
-
-
/**
* Unregister all the shared states referenced by the given.
*
@@ -141,20 +185,30 @@ public class SharedStateRegistry {
}
}
+ private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
+ if (streamStateHandle != null) {
+ asyncDisposalExecutor.execute(
+ new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
+ }
+ }
+
+ /**
+ * An entry in the registry, tracking the handle and the corresponding reference count.
+ */
private static class SharedStateEntry {
- /** The shared object */
- private final SharedStateHandle state;
+ /** The shared state handle */
+ private final StreamStateHandle state;
- /** The reference count of the object */
+ /** The current reference count of the state handle */
private int referenceCount;
- SharedStateEntry(SharedStateHandle value) {
+ SharedStateEntry(StreamStateHandle value) {
this.state = value;
this.referenceCount = 1;
}
- SharedStateHandle getState() {
+ StreamStateHandle getState() {
return state;
}
@@ -171,14 +225,56 @@ public class SharedStateRegistry {
}
}
- public int getReferenceCount(SharedStateHandle state) {
- if (state == null) {
- return 0;
+ /**
+ * The result of an attempt to (un)/reference state
+ */
+ public static class Result {
+
+ /** The (un)registered state handle from the request */
+ private final StreamStateHandle reference;
+
+ /** The reference count to the state handle after the request to (un)register */
+ private final int referenceCount;
+
+ private Result(SharedStateEntry sharedStateEntry) {
+ this.reference = sharedStateEntry.getState();
+ this.referenceCount = sharedStateEntry.getReferenceCount();
}
- SharedStateRegistry.SharedStateEntry entry =
- registeredStates.get(state.getRegistrationKey());
+ public Result(StreamStateHandle reference, int referenceCount) {
+ Preconditions.checkArgument(referenceCount >= 0);
- return entry == null ? 0 : entry.getReferenceCount();
+ this.reference = reference;
+ this.referenceCount = referenceCount;
+ }
+
+ public StreamStateHandle getReference() {
+ return reference;
+ }
+
+ public int getReferenceCount() {
+ return referenceCount;
+ }
+ }
+
+ /**
+ * Encapsulates the operation the delete state handles asynchronously.
+ */
+ private static final class AsyncDisposalRunnable implements Runnable {
+
+ private final StateObject toDispose;
+
+ public AsyncDisposalRunnable(StateObject toDispose) {
+ this.toDispose = Preconditions.checkNotNull(toDispose);
+ }
+
+ @Override
+ public void run() {
+ try {
+ toDispose.discardState();
+ } catch (Exception e) {
+ LOG.warn("A problem occurred during asynchronous disposal of a shared state object: {}", toDispose, e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
new file mode 100644
index 0000000..9e59359
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class represents a key that uniquely identifies (on a logical level) state handles for
+ * registration in the {@link SharedStateRegistry}. Two files which should logically
+ * be the same should have the same {@link SharedStateRegistryKey}. The meaning of logical
+ * equivalence is up to the application.
+ */
+public class SharedStateRegistryKey implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Uses a String as internal representation */
+ private final String keyString;
+
+ public SharedStateRegistryKey(String keyString) {
+ this.keyString = Preconditions.checkNotNull(keyString);
+ }
+
+ public String getKeyString() {
+ return keyString;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SharedStateRegistryKey that = (SharedStateRegistryKey) o;
+ return keyString.equals(that.keyString);
+ }
+
+ @Override
+ public int hashCode() {
+ return keyString.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return keyString;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
index 821bb69..03e2a13 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
@@ -19,9 +19,14 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.core.fs.FSDataInputStream;
import org.junit.Test;
+import java.io.IOException;
+
+import static junit.framework.TestCase.assertFalse;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class SharedStateRegistryTest {
@@ -30,24 +35,50 @@ public class SharedStateRegistryTest {
*/
@Test
public void testRegistryNormal() {
+
SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
// register one state
TestSharedState firstState = new TestSharedState("first");
- assertEquals(1, sharedStateRegistry.register(firstState));
+ SharedStateRegistry.Result result = sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), firstState);
+ assertEquals(1, result.getReferenceCount());
+ assertTrue(firstState == result.getReference());
+ assertFalse(firstState.isDiscarded());
// register another state
TestSharedState secondState = new TestSharedState("second");
- assertEquals(1, sharedStateRegistry.register(secondState));
-
- // register the first state again
- assertEquals(2, sharedStateRegistry.register(firstState));
+ result = sharedStateRegistry.registerNewReference(secondState.getRegistrationKey(), secondState);
+ assertEquals(1, result.getReferenceCount());
+ assertTrue(secondState == result.getReference());
+ assertFalse(firstState.isDiscarded());
+ assertFalse(secondState.isDiscarded());
+
+ // attempt to register state under an existing key
+ TestSharedState firstStatePrime = new TestSharedState(firstState.getRegistrationKey().getKeyString());
+ result = sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), firstStatePrime);
+ assertEquals(2, result.getReferenceCount());
+ assertFalse(firstStatePrime == result.getReference());
+ assertTrue(firstState == result.getReference());
+ assertTrue(firstStatePrime.isDiscarded());
+ assertFalse(firstState.isDiscarded());
+
+ // reference the first state again
+ result = sharedStateRegistry.obtainReference(firstState.getRegistrationKey());
+ assertEquals(3, result.getReferenceCount());
+ assertTrue(firstState == result.getReference());
+ assertFalse(firstState.isDiscarded());
// unregister the second state
- assertEquals(0, sharedStateRegistry.unregister(secondState));
+ result = sharedStateRegistry.releaseReference(secondState.getRegistrationKey());
+ assertEquals(0, result.getReferenceCount());
+ assertTrue(result.getReference() == null);
+ assertTrue(secondState.isDiscarded());
// unregister the first state
- assertEquals(1, sharedStateRegistry.unregister(firstState));
+ result = sharedStateRegistry.releaseReference(firstState.getRegistrationKey());
+ assertEquals(2, result.getReferenceCount());
+ assertTrue(firstState == result.getReference());
+ assertFalse(firstState.isDiscarded());
}
/**
@@ -56,51 +87,47 @@ public class SharedStateRegistryTest {
@Test(expected = IllegalStateException.class)
public void testUnregisterWithUnexistedKey() {
SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-
- sharedStateRegistry.unregister(new TestSharedState("unexisted"));
+ sharedStateRegistry.releaseReference(new SharedStateRegistryKey("non-existent"));
}
- private static class TestSharedState implements SharedStateHandle {
+ private static class TestSharedState implements StreamStateHandle {
private static final long serialVersionUID = 4468635881465159780L;
- private String key;
+ private SharedStateRegistryKey key;
+
+ private boolean discarded;
TestSharedState(String key) {
- this.key = key;
+ this.key = new SharedStateRegistryKey(key);
+ this.discarded = false;
}
- @Override
- public String getRegistrationKey() {
+ public SharedStateRegistryKey getRegistrationKey() {
return key;
}
@Override
public void discardState() throws Exception {
- // nothing to do
+ this.discarded = true;
}
@Override
public long getStateSize() {
- return key.length();
+ return key.toString().length();
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- TestSharedState testState = (TestSharedState) o;
-
- return key.equals(testState.key);
+ public int hashCode() {
+ return key.hashCode();
}
@Override
- public int hashCode() {
- return key.hashCode();
+ public FSDataInputStream openInputStream() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isDiscarded() {
+ return discarded;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 658ccde..ca66ffb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -482,6 +482,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
@SuppressWarnings("unchecked")
public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
Environment env = new DummyEnvironment("test", 1, 0);
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
@@ -509,6 +510,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
streamFactory,
CheckpointOptions.forFullCheckpoint()));
+ snapshot.registerSharedStates(sharedStateRegistry);
backend.dispose();
// ========== restore snapshot - should use default serializer (ONLY SERIALIZATION) ==========
@@ -518,8 +520,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
- snapshot.discardState();
-
// re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
// initializeSerializerUnlessSet would not pick up our new config
kvId = new ValueStateDescriptor<>("id", pojoType);
@@ -536,6 +536,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
streamFactory,
CheckpointOptions.forFullCheckpoint()));
+ snapshot2.registerSharedStates(sharedStateRegistry);
+
+ snapshot.unregisterSharedStates(sharedStateRegistry);
+ snapshot.discardState();
+
backend.dispose();
// ========= restore snapshot - should use default serializer (FAIL ON DESERIALIZATION) =========
@@ -570,6 +575,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
@Test
public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
Environment env = new DummyEnvironment("test", 1, 0);
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
@@ -597,6 +603,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
streamFactory,
CheckpointOptions.forFullCheckpoint()));
+ snapshot.registerSharedStates(sharedStateRegistry);
backend.dispose();
// ========== restore snapshot - should use specific serializer (ONLY SERIALIZATION) ==========
@@ -605,8 +612,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
- snapshot.discardState();
-
// re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
// initializeSerializerUnlessSet would not pick up our new config
kvId = new ValueStateDescriptor<>("id", pojoType);
@@ -623,6 +628,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
streamFactory,
CheckpointOptions.forFullCheckpoint()));
+ snapshot2.registerSharedStates(sharedStateRegistry);
+
+ snapshot.unregisterSharedStates(sharedStateRegistry);
+ snapshot.discardState();
+
backend.dispose();
// ========= restore snapshot - should use specific serializer (FAIL ON DESERIALIZATION) =========