You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/01/19 07:21:56 UTC

[flink] 02/02: [FLINK-25194] Implement an API for duplicating artefacts

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc1d63579ae5bcd9db207e1a5cd1b6365a87e871
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Jan 18 16:12:42 2022 +0100

    [FLINK-25194] Implement an API for duplicating artefacts
---
 .../flink/core/fs/DuplicatingFileSystem.java       |  74 +++++++++++++++
 .../org/apache/flink/core/fs/EntropyInjector.java  |  21 ++++-
 .../runtime/state/CheckpointStateToolset.java      |  54 +++++++++++
 .../runtime/state/CheckpointStorageWorkerView.java |  11 +++
 .../runtime/state/CheckpointStreamFactory.java     |  28 ++++++
 .../NotDuplicatingCheckpointStateToolset.java      |  37 ++++++++
 .../state/filesystem/FsCheckpointStateToolset.java |  91 ++++++++++++++++++
 .../filesystem/FsCheckpointStorageAccess.java      |  13 +++
 .../filesystem/FsCheckpointStreamFactory.java      |  60 +++++++++++-
 .../state/memory/MemCheckpointStreamFactory.java   |  14 ++-
 .../MemoryBackendCheckpointStorageAccess.java      |   7 ++
 .../runtime/state/ChannelPersistenceITCase.java    |   5 +
 .../state/TestCheckpointStorageWorkerView.java     |   5 +
 ...tingCheckpointStorageAccessCoordinatorView.java |   5 +
 .../filesystem/FsCheckpointStateToolsetTest.java   | 103 +++++++++++++++++++++
 .../filesystem/FsCheckpointStorageAccessTest.java  |  42 +++++++++
 .../state/testutils/BackendForTestStream.java      |  21 +++++
 .../testutils/TestCheckpointStreamFactory.java     |  14 +++
 .../state/ttl/mock/MockCheckpointStorage.java      |  23 +++++
 .../util/BlockerCheckpointStreamFactory.java       |  13 +++
 .../state/NonCheckpointingStorageAccess.java       |   7 ++
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    |   6 ++
 .../UnalignedCheckpointFailureHandlingITCase.java  |  30 +++++-
 23 files changed, 675 insertions(+), 9 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java
new file mode 100644
index 0000000..3313fbc
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * An extension interface for {@link FileSystem FileSystems} that can perform cheap DFS side
+ * duplicate operation. Such an operation can improve the time required for creating cheaply
+ * independent snapshots from incremental snapshots.
+ */
+public interface DuplicatingFileSystem {
+    /**
+     * Tells if we can perform duplicate/copy between given paths.
+     *
+     * <p>This should be a rather cheap operation, preferably not involving any remote accesses. You
+     * can check e.g. if both paths are on the same host.
+     *
+     * @param source The path of the source file to duplicate
+     * @param destination The path where to duplicate the source file
+     * @return true, if we can perform the duplication
+     */
+    boolean canFastDuplicate(Path source, Path destination) throws IOException;
+
+    /**
+     * Duplicates the source path into the destination path.
+     *
+     * <p>You should first check if you can duplicate with {@link #canFastDuplicate(Path, Path)}.
+     *
+     * @param requests Pairs of src/dst to copy.
+     */
+    void duplicate(List<CopyRequest> requests) throws IOException;
+
+    /** A pair of source and destination to duplicate a file. */
+    interface CopyRequest {
+        /** The path of the source file to duplicate. */
+        Path getSource();
+
+        /** The path where to duplicate the source file. */
+        Path getDestination();
+
+        /** A factory method for creating a simple pair of source/destination. */
+        static CopyRequest of(Path source, Path destination) {
+            return new CopyRequest() {
+                @Override
+                public Path getSource() {
+                    return source;
+                }
+
+                @Override
+                public Path getDestination() {
+                    return destination;
+                }
+            };
+        }
+    }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
index d8e22d2..043d81c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
@@ -46,15 +46,30 @@ public class EntropyInjector {
      * EntropyInjectingFileSystem#getEntropyInjectionKey()}.
      *
      * <p>If the given file system does not implement {@code EntropyInjectingFileSystem}, then this
+     * method returns the same path.
+     */
+    public static Path addEntropy(FileSystem fs, Path path) throws IOException {
+        // check and possibly inject entropy into the path
+        final EntropyInjectingFileSystem efs = getEntropyFs(fs);
+        return efs == null ? path : resolveEntropy(path, efs, true);
+    }
+
+    /**
+     * Handles entropy injection across regular and entropy-aware file systems.
+     *
+     * <p>If the given file system is entropy-aware (a implements {@link
+     * EntropyInjectingFileSystem}), then this method replaces the entropy marker in the path with
+     * random characters. The entropy marker is defined by {@link
+     * EntropyInjectingFileSystem#getEntropyInjectionKey()}.
+     *
+     * <p>If the given file system does not implement {@code EntropyInjectingFileSystem}, then this
      * method delegates to {@link FileSystem#create(Path, WriteMode)} and returns the same path in
      * the resulting {@code OutputStreamAndPath}.
      */
     public static OutputStreamAndPath createEntropyAware(
             FileSystem fs, Path path, WriteMode writeMode) throws IOException {
 
-        // check and possibly inject entropy into the path
-        final EntropyInjectingFileSystem efs = getEntropyFs(fs);
-        final Path processedPath = efs == null ? path : resolveEntropy(path, efs, true);
+        final Path processedPath = addEntropy(fs, path);
 
         // create the stream on the original file system to let the safety net
         // take its effect
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateToolset.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateToolset.java
new file mode 100644
index 0000000..eb21b9c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateToolset.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A toolset of operations that can be performed on a location embedded within the class. Created in
+ * {@link CheckpointStorageWorkerView}.
+ */
+@Internal
+public interface CheckpointStateToolset {
+
+    /**
+     * Tells if we can duplicate the given {@link StreamStateHandle}.
+     *
+     * <p>This should be a rather cheap operation, preferably not involving any remote accesses.
+     *
+     * @param stateHandle The handle to duplicate
+     * @return true, if we can perform the duplication
+     */
+    boolean canFastDuplicate(StreamStateHandle stateHandle) throws IOException;
+
+    /**
+     * Duplicates {@link StreamStateHandle StreamStateHandles} into the path embedded inside of the
+     * class.
+     *
+     * <p>You should first check if you can duplicate with {@link
+     * #canFastDuplicate(StreamStateHandle)}.
+     *
+     * @param stateHandle The handles to duplicate
+     * @return The duplicated handles
+     */
+    List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandle) throws IOException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
index 39d4a74..3e7e659 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.Internal;
+
 import java.io.IOException;
 
 /**
@@ -27,6 +29,7 @@ import java.io.IOException;
  *
  * <p>Methods of this interface act as a worker role in task manager.
  */
+@Internal
 public interface CheckpointStorageWorkerView {
 
     /**
@@ -64,4 +67,12 @@ public interface CheckpointStorageWorkerView {
      * @throws IOException Thrown, if the stream cannot be opened.
      */
     CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;
+
+    /**
+     * A complementary method to {@link #createTaskOwnedStateStream()}. Creates a toolset that gives
+     * access to additional operations that can be performed in the task owned state location.
+     *
+     * @return A toolset for additional operations for state owned by tasks.
+     */
+    CheckpointStateToolset createTaskOwnedCheckpointStateToolset();
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
index c905438..96c9599 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * A factory for checkpoint output streams, which are used to persist data for checkpoints.
@@ -39,4 +40,31 @@ public interface CheckpointStreamFactory {
      */
     CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope)
             throws IOException;
+
+    /**
+     * Tells if we can duplicate the given {@link StreamStateHandle} into the path corresponding to
+     * the given {@link CheckpointedStateScope}.
+     *
+     * <p>This should be a rather cheap operation, preferably not involving any remote accesses.
+     *
+     * @param stateHandle The handle to duplicate
+     * @param scope Scope determining the location to duplicate into
+     * @return true, if we can perform the duplication
+     */
+    boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope)
+            throws IOException;
+
+    /**
+     * Duplicates {@link StreamStateHandle} into the path corresponding to * the given {@link
+     * CheckpointedStateScope}.
+     *
+     * <p>You should first check if you can duplicate with {@link
+     * #canFastDuplicate(StreamStateHandle, CheckpointedStateScope)}.
+     *
+     * @param stateHandles The handles to duplicate
+     * @param scope Scope determining the location to duplicate into
+     * @return The duplicated handle
+     */
+    List<StreamStateHandle> duplicate(
+            List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NotDuplicatingCheckpointStateToolset.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NotDuplicatingCheckpointStateToolset.java
new file mode 100644
index 0000000..354c821
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NotDuplicatingCheckpointStateToolset.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+/** An empty implementation of {@link CheckpointStateToolset}. */
+public final class NotDuplicatingCheckpointStateToolset implements CheckpointStateToolset {
+    @Override
+    public boolean canFastDuplicate(StreamStateHandle stateHandle) throws IOException {
+        return false;
+    }
+
+    @Override
+    public List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandle)
+            throws IOException {
+        throw new UnsupportedEncodingException("The toolset does not support duplication");
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
new file mode 100644
index 0000000..1f8423c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.DuplicatingFileSystem;
+import org.apache.flink.core.fs.DuplicatingFileSystem.CopyRequest;
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * An implementation of {@link CheckpointStateToolset} that does file based duplicating with as
+ * {@link DuplicatingFileSystem}.
+ */
+public class FsCheckpointStateToolset implements CheckpointStateToolset {
+
+    private final Path basePath;
+    private final DuplicatingFileSystem fs;
+
+    public FsCheckpointStateToolset(Path basePath, DuplicatingFileSystem fs) {
+        this.basePath = basePath;
+        this.fs = fs;
+    }
+
+    @Override
+    public boolean canFastDuplicate(StreamStateHandle stateHandle) throws IOException {
+        if (!(stateHandle instanceof FileStateHandle)) {
+            return false;
+        }
+        final Path srcPath = ((FileStateHandle) stateHandle).getFilePath();
+        final Path dst = getNewDstPath(srcPath.getName());
+        return fs.canFastDuplicate(srcPath, dst);
+    }
+
+    @Override
+    public List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandles)
+            throws IOException {
+
+        final List<CopyRequest> requests = new ArrayList<>();
+        for (StreamStateHandle handle : stateHandles) {
+            if (!(handle instanceof FileStateHandle)) {
+                throw new IllegalArgumentException("We can duplicate only FileStateHandles.");
+            }
+            final Path srcPath = ((FileStateHandle) handle).getFilePath();
+            requests.add(CopyRequest.of(srcPath, getNewDstPath(srcPath.getName())));
+        }
+        fs.duplicate(requests);
+
+        return IntStream.range(0, stateHandles.size())
+                .mapToObj(
+                        idx -> {
+                            final StreamStateHandle originalHandle = stateHandles.get(idx);
+                            final Path dst = requests.get(idx).getDestination();
+                            if (originalHandle instanceof RelativeFileStateHandle) {
+                                return new RelativeFileStateHandle(
+                                        dst, dst.getName(), originalHandle.getStateSize());
+                            } else {
+                                return new FileStateHandle(dst, originalHandle.getStateSize());
+                            }
+                        })
+                .collect(Collectors.toList());
+    }
+
+    private Path getNewDstPath(String fileName) throws IOException {
+        final Path dst = new Path(basePath, fileName);
+        return EntropyInjector.addEntropy(dst.getFileSystem(), dst);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
index 1c3ebb2..af37324 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
@@ -20,12 +20,15 @@ package org.apache.flink.runtime.state.filesystem;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.DuplicatingFileSystem;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.NotDuplicatingCheckpointStateToolset;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
 
 import javax.annotation.Nullable;
@@ -177,6 +180,16 @@ public class FsCheckpointStorageAccess extends AbstractFsCheckpointStorageAccess
     }
 
     @Override
+    public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+        if (fileSystem instanceof DuplicatingFileSystem) {
+            return new FsCheckpointStateToolset(
+                    taskOwnedStateDirectory, (DuplicatingFileSystem) fileSystem);
+        } else {
+            return new NotDuplicatingCheckpointStateToolset();
+        }
+    }
+
+    @Override
     protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) {
         final CheckpointStorageLocationReference reference = encodePathAsReference(location);
         return new FsCheckpointStorageLocation(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 5151130..8774848 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.core.fs.DuplicatingFileSystem;
 import org.apache.flink.core.fs.EntropyInjector;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -37,6 +38,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -87,6 +89,10 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
     /** Whether the file system dynamically injects entropy into the file paths. */
     private final boolean entropyInjecting;
 
+    private final FsCheckpointStateToolset privateStateToolset;
+
+    private final FsCheckpointStateToolset sharedStateToolset;
+
     /**
      * Creates a new stream factory that stores its checkpoint data in the file system and location
      * defined by the given Path.
@@ -130,6 +136,16 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
         this.fileStateThreshold = fileStateSizeThreshold;
         this.writeBufferSize = writeBufferSize;
         this.entropyInjecting = EntropyInjector.isEntropyInjecting(fileSystem);
+        if (fileSystem instanceof DuplicatingFileSystem) {
+            final DuplicatingFileSystem duplicatingFileSystem = (DuplicatingFileSystem) fileSystem;
+            this.privateStateToolset =
+                    new FsCheckpointStateToolset(checkpointDirectory, duplicatingFileSystem);
+            this.sharedStateToolset =
+                    new FsCheckpointStateToolset(sharedStateDirectory, duplicatingFileSystem);
+        } else {
+            this.privateStateToolset = null;
+            this.sharedStateToolset = null;
+        }
     }
 
     // ------------------------------------------------------------------------
@@ -137,10 +153,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
     @Override
     public FsCheckpointStateOutputStream createCheckpointStateOutputStream(
             CheckpointedStateScope scope) throws IOException {
-        Path target =
-                scope == CheckpointedStateScope.EXCLUSIVE
-                        ? checkpointDirectory
-                        : sharedStateDirectory;
+        Path target = getTargetPath(scope);
         int bufferSize = Math.max(writeBufferSize, fileStateThreshold);
 
         final boolean absolutePath = entropyInjecting || scope == CheckpointedStateScope.SHARED;
@@ -148,6 +161,45 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
                 target, filesystem, bufferSize, fileStateThreshold, !absolutePath);
     }
 
+    private Path getTargetPath(CheckpointedStateScope scope) {
+        return scope == CheckpointedStateScope.EXCLUSIVE
+                ? checkpointDirectory
+                : sharedStateDirectory;
+    }
+
+    @Override
+    public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope)
+            throws IOException {
+        if (privateStateToolset == null || sharedStateToolset == null) {
+            return false;
+        }
+        switch (scope) {
+            case EXCLUSIVE:
+                return privateStateToolset.canFastDuplicate(stateHandle);
+            case SHARED:
+                return sharedStateToolset.canFastDuplicate(stateHandle);
+        }
+        return false;
+    }
+
+    @Override
+    public List<StreamStateHandle> duplicate(
+            List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException {
+
+        if (privateStateToolset == null || sharedStateToolset == null) {
+            throw new IllegalArgumentException("The underlying FS does not support duplication.");
+        }
+
+        switch (scope) {
+            case EXCLUSIVE:
+                return privateStateToolset.duplicate(stateHandles);
+            case SHARED:
+                return sharedStateToolset.duplicate(stateHandles);
+            default:
+                throw new IllegalArgumentException("Unknown state scope: " + scope);
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  utilities
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 98d09be..c9b5088 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -28,13 +28,14 @@ import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /** {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays. */
 public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
 
-    /** The maximal size that the snapshotted memory state may have */
+    /** The maximal size that the snapshotted memory state may have. */
     private final int maxStateSize;
 
     /**
@@ -54,6 +55,17 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
     }
 
     @Override
+    public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) {
+        return false;
+    }
+
+    @Override
+    public List<StreamStateHandle> duplicate(
+            List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException {
+        throw new UnsupportedOperationException("We can not duplicate handles in memory.");
+    }
+
+    @Override
     public String toString() {
         return "In-Memory Stream Factory";
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
index 10e33a9..46ae6c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
@@ -23,9 +23,11 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.NotDuplicatingCheckpointStateToolset;
 import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
 
@@ -157,6 +159,11 @@ public class MemoryBackendCheckpointStorageAccess extends AbstractFsCheckpointSt
     }
 
     @Override
+    public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+        return new NotDuplicatingCheckpointStateToolset();
+    }
+
+    @Override
     protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) {
         return new PersistentMetadataCheckpointStorageLocation(fs, location, maxStateSize);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
index 8363e2e..f9cc467 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
@@ -271,6 +271,11 @@ public class ChannelPersistenceITCase {
             public CheckpointStateOutputStream createTaskOwnedStateStream() {
                 throw new UnsupportedOperationException();
             }
+
+            @Override
+            public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+                throw new UnsupportedOperationException();
+            }
         };
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
index 9b8d298..afe955d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
@@ -53,4 +53,9 @@ public class TestCheckpointStorageWorkerView implements CheckpointStorageWorkerV
         return taskOwnedCheckpointStreamFactory.createCheckpointStateOutputStream(
                 taskOwnedStateScope);
     }
+
+    @Override
+    public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+        return new NotDuplicatingCheckpointStateToolset();
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
index 9e05949..9213a51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
@@ -97,6 +97,11 @@ public class TestingCheckpointStorageAccessCoordinatorView
         return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(Integer.MAX_VALUE);
     }
 
+    @Override
+    public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+        return new NotDuplicatingCheckpointStateToolset();
+    }
+
     // ------------------------------------------------------------------------
     //  Utils
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java
new file mode 100644
index 0000000..d9dafe5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.DuplicatingFileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FsCheckpointStateToolset}. */
+class FsCheckpointStateToolsetTest {
+    @Test
+    void testCanDuplicateNonFileStreamHandle() throws IOException {
+        final FsCheckpointStateToolset stateToolset =
+                new FsCheckpointStateToolset(
+                        new Path("test-path"), new TestDuplicatingFileSystem());
+
+        final boolean canFastDuplicate =
+                stateToolset.canFastDuplicate(new ByteStreamStateHandle("test", new byte[] {}));
+        assertThat(canFastDuplicate).isFalse();
+    }
+
+    @Test
+    void testCanDuplicate() throws IOException {
+        final FsCheckpointStateToolset stateToolset =
+                new FsCheckpointStateToolset(
+                        new Path("test-path"), new TestDuplicatingFileSystem());
+
+        final boolean canFastDuplicate =
+                stateToolset.canFastDuplicate(
+                        new FileStateHandle(new Path("old-test-path", "test-file"), 0));
+        assertThat(canFastDuplicate).isTrue();
+    }
+
+    @Test
+    void testCannotDuplicate() throws IOException {
+        final FsCheckpointStateToolset stateToolset =
+                new FsCheckpointStateToolset(
+                        new Path("test-path"), new TestDuplicatingFileSystem());
+
+        final boolean canFastDuplicate =
+                stateToolset.canFastDuplicate(
+                        new FileStateHandle(new Path("test-path", "test-file"), 0));
+        assertThat(canFastDuplicate).isFalse();
+    }
+
+    @Test
+    void testDuplicating() throws IOException {
+        final TestDuplicatingFileSystem fs = new TestDuplicatingFileSystem();
+        final FsCheckpointStateToolset stateToolset =
+                new FsCheckpointStateToolset(new Path("test-path"), fs);
+
+        final List<StreamStateHandle> duplicated =
+                stateToolset.duplicate(
+                        Arrays.asList(
+                                new FileStateHandle(new Path("old-test-path", "test-file1"), 0),
+                                new FileStateHandle(new Path("old-test-path", "test-file2"), 0),
+                                new RelativeFileStateHandle(
+                                        new Path("old-test-path", "test-file3"), "test-file3", 0)));
+
+        assertThat(duplicated)
+                .containsExactly(
+                        new FileStateHandle(new Path("test-path", "test-file1"), 0),
+                        new FileStateHandle(new Path("test-path", "test-file2"), 0),
+                        new RelativeFileStateHandle(
+                                new Path("test-path", "test-file3"), "test-file3", 0));
+    }
+
+    private static final class TestDuplicatingFileSystem implements DuplicatingFileSystem {
+
+        @Override
+        public boolean canFastDuplicate(Path source, Path destination) throws IOException {
+            return !source.equals(destination);
+        }
+
+        @Override
+        public void duplicate(List<CopyRequest> requests) throws IOException {}
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
index 7b24f3d..2c2fbdc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.filesystem;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.DuplicatingFileSystem;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
@@ -26,19 +27,24 @@ import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.NotDuplicatingCheckpointStateToolset;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
+import org.apache.flink.testutils.TestFileSystem;
 
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -280,6 +286,42 @@ public class FsCheckpointStorageAccessTest extends AbstractFileCheckpointStorage
         assertTrue(fileSystem instanceof LocalFileSystem);
     }
 
+    @Test
+    public void testNotDuplicationCheckpointStateToolset() throws Exception {
+        CheckpointStorageAccess checkpointStorage = createCheckpointStorage(randomTempPath());
+        assertThat(
+                checkpointStorage.createTaskOwnedCheckpointStateToolset(),
+                instanceOf(NotDuplicatingCheckpointStateToolset.class));
+    }
+
+    @Test
+    public void testDuplicationCheckpointStateToolset() throws Exception {
+        CheckpointStorageAccess checkpointStorage =
+                new FsCheckpointStorageAccess(
+                        new TestDuplicatingFileSystem(),
+                        randomTempPath(),
+                        null,
+                        new JobID(),
+                        FILE_SIZE_THRESHOLD,
+                        WRITE_BUFFER_SIZE);
+
+        assertThat(
+                checkpointStorage.createTaskOwnedCheckpointStateToolset(),
+                instanceOf(FsCheckpointStateToolset.class));
+    }
+
+    private static final class TestDuplicatingFileSystem extends TestFileSystem
+            implements DuplicatingFileSystem {
+
+        @Override
+        public boolean canFastDuplicate(Path source, Path destination) throws IOException {
+            return !source.equals(destination);
+        }
+
+        @Override
+        public void duplicate(List<CopyRequest> requests) throws IOException {}
+    }
+
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
index 0331d5b..5ad7903 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
@@ -21,18 +21,21 @@ package org.apache.flink.runtime.state.testutils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.util.function.SupplierWithException;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -114,6 +117,11 @@ public class BackendForTestStream extends MemoryStateBackend {
         public CheckpointStateOutputStream createTaskOwnedStateStream() {
             throw new UnsupportedOperationException();
         }
+
+        @Override
+        public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+            throw new UnsupportedOperationException();
+        }
     }
 
     private static final class TestFactory
@@ -130,5 +138,18 @@ public class BackendForTestStream extends MemoryStateBackend {
                 CheckpointedStateScope scope) throws IOException {
             return streamFactory.get();
         }
+
+        @Override
+        public boolean canFastDuplicate(
+                StreamStateHandle stateHandle, CheckpointedStateScope scope) {
+            return false;
+        }
+
+        @Override
+        public List<StreamStateHandle> duplicate(
+                List<StreamStateHandle> stateHandles, CheckpointedStateScope scope)
+                throws IOException {
+            throw new UnsupportedOperationException();
+        }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
index 80f36b2..0f16c73 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
@@ -21,7 +21,10 @@ package org.apache.flink.runtime.state.testutils;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StreamStateHandle;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.function.Supplier;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -42,4 +45,15 @@ public class TestCheckpointStreamFactory implements CheckpointStreamFactory {
             CheckpointedStateScope scope) {
         return supplier.get();
     }
+
+    @Override
+    public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) {
+        return false;
+    }
+
+    @Override
+    public List<StreamStateHandle> duplicate(
+            List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
index fb4a8a0..91e78f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.ttl.mock;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
@@ -28,9 +29,13 @@ import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.StreamStateHandle;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
+import java.util.List;
+
 public class MockCheckpointStorage implements CheckpointStorage {
     @Override
     public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) {
@@ -69,6 +74,19 @@ public class MockCheckpointStorage implements CheckpointStorage {
                     }
 
                     @Override
+                    public boolean canFastDuplicate(
+                            StreamStateHandle stateHandle, CheckpointedStateScope scope) {
+                        return false;
+                    }
+
+                    @Override
+                    public List<StreamStateHandle> duplicate(
+                            List<StreamStateHandle> stateHandles, CheckpointedStateScope scope)
+                            throws IOException {
+                        return null;
+                    }
+
+                    @Override
                     public CheckpointMetadataOutputStream createMetadataOutputStream() {
                         return null;
                     }
@@ -99,6 +117,11 @@ public class MockCheckpointStorage implements CheckpointStorage {
             public CheckpointStateOutputStream createTaskOwnedStateStream() {
                 return null;
             }
+
+            @Override
+            public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+                return null;
+            }
         };
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index 021e340..8f16b5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -24,10 +24,12 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 /** {@link CheckpointStreamFactory} for tests that allows for testing cancellation in async IO. */
@@ -86,4 +88,15 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
 
         return blockingStream;
     }
+
+    @Override
+    public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) {
+        return false;
+    }
+
+    @Override
+    public List<StreamStateHandle> duplicate(
+            List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
index 3422850..8986a5c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators.sorted.state;
 
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -73,4 +74,10 @@ class NonCheckpointingStorageAccess implements CheckpointStorageAccess {
         throw new UnsupportedOperationException(
                 "Checkpoints are not supported in a single key state backend");
     }
+
+    @Override
+    public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+        throw new UnsupportedOperationException(
+                "Checkpoints are not supported in a single key state backend");
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index bcb4bdd..7c96d49 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -717,6 +718,11 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
         public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
             return delegate.createTaskOwnedStateStream();
         }
+
+        @Override
+        public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+            return delegate.createTaskOwnedCheckpointStateToolset();
+        }
     }
 
     private static void logCheckpointProcessingDelay(CheckpointMetaData checkpointMetaData) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
index ef7fc8a..81a1b78 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
@@ -24,11 +24,13 @@ import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
@@ -51,7 +53,9 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.time.Duration;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -224,7 +228,26 @@ public class UnalignedCheckpointFailureHandlingITCase {
         @Override
         public CheckpointStreamFactory resolveCheckpointStorageLocation(
                 long checkpointId, CheckpointStorageLocationReference reference) {
-            return ign -> new FailingOnceFsCheckpointOutputStream(path, 100, 0, failOnClose);
+            return new CheckpointStreamFactory() {
+                @Override
+                public CheckpointStateOutputStream createCheckpointStateOutputStream(
+                        CheckpointedStateScope scope) throws IOException {
+                    return new FailingOnceFsCheckpointOutputStream(path, 100, 0, failOnClose);
+                }
+
+                @Override
+                public boolean canFastDuplicate(
+                        StreamStateHandle stateHandle, CheckpointedStateScope scope) {
+                    return false;
+                }
+
+                @Override
+                public List<StreamStateHandle> duplicate(
+                        List<StreamStateHandle> stateHandles, CheckpointedStateScope scope)
+                        throws IOException {
+                    throw new UnsupportedEncodingException();
+                }
+            };
         }
 
         @Override
@@ -233,6 +256,11 @@ public class UnalignedCheckpointFailureHandlingITCase {
         }
 
         @Override
+        public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+            return delegate.createTaskOwnedCheckpointStateToolset();
+        }
+
+        @Override
         public boolean supportsHighlyAvailableStorage() {
             return delegate.supportsHighlyAvailableStorage();
         }