You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/01/19 07:21:56 UTC
[flink] 02/02: [FLINK-25194] Implement an API for duplicating artefacts
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc1d63579ae5bcd9db207e1a5cd1b6365a87e871
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Jan 18 16:12:42 2022 +0100
[FLINK-25194] Implement an API for duplicating artefacts
---
.../flink/core/fs/DuplicatingFileSystem.java | 74 +++++++++++++++
.../org/apache/flink/core/fs/EntropyInjector.java | 21 ++++-
.../runtime/state/CheckpointStateToolset.java | 54 +++++++++++
.../runtime/state/CheckpointStorageWorkerView.java | 11 +++
.../runtime/state/CheckpointStreamFactory.java | 28 ++++++
.../NotDuplicatingCheckpointStateToolset.java | 37 ++++++++
.../state/filesystem/FsCheckpointStateToolset.java | 91 ++++++++++++++++++
.../filesystem/FsCheckpointStorageAccess.java | 13 +++
.../filesystem/FsCheckpointStreamFactory.java | 60 +++++++++++-
.../state/memory/MemCheckpointStreamFactory.java | 14 ++-
.../MemoryBackendCheckpointStorageAccess.java | 7 ++
.../runtime/state/ChannelPersistenceITCase.java | 5 +
.../state/TestCheckpointStorageWorkerView.java | 5 +
...tingCheckpointStorageAccessCoordinatorView.java | 5 +
.../filesystem/FsCheckpointStateToolsetTest.java | 103 +++++++++++++++++++++
.../filesystem/FsCheckpointStorageAccessTest.java | 42 +++++++++
.../state/testutils/BackendForTestStream.java | 21 +++++
.../testutils/TestCheckpointStreamFactory.java | 14 +++
.../state/ttl/mock/MockCheckpointStorage.java | 23 +++++
.../util/BlockerCheckpointStreamFactory.java | 13 +++
.../state/NonCheckpointingStorageAccess.java | 7 ++
.../tasks/SubtaskCheckpointCoordinatorImpl.java | 6 ++
.../UnalignedCheckpointFailureHandlingITCase.java | 30 +++++-
23 files changed, 675 insertions(+), 9 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java
new file mode 100644
index 0000000..3313fbc
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * An extension interface for {@link FileSystem FileSystems} that can perform cheap DFS side
+ * duplicate operation. Such an operation can improve the time required for creating cheaply
+ * independent snapshots from incremental snapshots.
+ */
+public interface DuplicatingFileSystem {
+ /**
+ * Tells if we can perform duplicate/copy between given paths.
+ *
+ * <p>This should be a rather cheap operation, preferably not involving any remote accesses. You
+ * can check e.g. if both paths are on the same host.
+ *
+ * @param source The path of the source file to duplicate
+ * @param destination The path where to duplicate the source file
+ * @return true, if we can perform the duplication
+ */
+ boolean canFastDuplicate(Path source, Path destination) throws IOException;
+
+ /**
+ * Duplicates the source path into the destination path.
+ *
+ * <p>You should first check if you can duplicate with {@link #canFastDuplicate(Path, Path)}.
+ *
+ * @param requests Pairs of src/dst to copy.
+ */
+ void duplicate(List<CopyRequest> requests) throws IOException;
+
+ /** A pair of source and destination to duplicate a file. */
+ interface CopyRequest {
+ /** The path of the source file to duplicate. */
+ Path getSource();
+
+ /** The path where to duplicate the source file. */
+ Path getDestination();
+
+ /** A factory method for creating a simple pair of source/destination. */
+ static CopyRequest of(Path source, Path destination) {
+ return new CopyRequest() {
+ @Override
+ public Path getSource() {
+ return source;
+ }
+
+ @Override
+ public Path getDestination() {
+ return destination;
+ }
+ };
+ }
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
index d8e22d2..043d81c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
@@ -46,15 +46,30 @@ public class EntropyInjector {
* EntropyInjectingFileSystem#getEntropyInjectionKey()}.
*
* <p>If the given file system does not implement {@code EntropyInjectingFileSystem}, then this
+ * method returns the same path.
+ */
+ public static Path addEntropy(FileSystem fs, Path path) throws IOException {
+ // check and possibly inject entropy into the path
+ final EntropyInjectingFileSystem efs = getEntropyFs(fs);
+ return efs == null ? path : resolveEntropy(path, efs, true);
+ }
+
+ /**
+ * Handles entropy injection across regular and entropy-aware file systems.
+ *
+ * <p>If the given file system is entropy-aware (a implements {@link
+ * EntropyInjectingFileSystem}), then this method replaces the entropy marker in the path with
+ * random characters. The entropy marker is defined by {@link
+ * EntropyInjectingFileSystem#getEntropyInjectionKey()}.
+ *
+ * <p>If the given file system does not implement {@code EntropyInjectingFileSystem}, then this
* method delegates to {@link FileSystem#create(Path, WriteMode)} and returns the same path in
* the resulting {@code OutputStreamAndPath}.
*/
public static OutputStreamAndPath createEntropyAware(
FileSystem fs, Path path, WriteMode writeMode) throws IOException {
- // check and possibly inject entropy into the path
- final EntropyInjectingFileSystem efs = getEntropyFs(fs);
- final Path processedPath = efs == null ? path : resolveEntropy(path, efs, true);
+ final Path processedPath = addEntropy(fs, path);
// create the stream on the original file system to let the safety net
// take its effect
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateToolset.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateToolset.java
new file mode 100644
index 0000000..eb21b9c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateToolset.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A toolset of operations that can be performed on a location embedded within the class. Created in
+ * {@link CheckpointStorageWorkerView}.
+ */
+@Internal
+public interface CheckpointStateToolset {
+
+ /**
+ * Tells if we can duplicate the given {@link StreamStateHandle}.
+ *
+ * <p>This should be a rather cheap operation, preferably not involving any remote accesses.
+ *
+ * @param stateHandle The handle to duplicate
+ * @return true, if we can perform the duplication
+ */
+ boolean canFastDuplicate(StreamStateHandle stateHandle) throws IOException;
+
+ /**
+ * Duplicates {@link StreamStateHandle StreamStateHandles} into the path embedded inside of the
+ * class.
+ *
+ * <p>You should first check if you can duplicate with {@link
+ * #canFastDuplicate(StreamStateHandle)}.
+ *
+ * @param stateHandle The handles to duplicate
+ * @return The duplicated handles
+ */
+ List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandle) throws IOException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
index 39d4a74..3e7e659 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.annotation.Internal;
+
import java.io.IOException;
/**
@@ -27,6 +29,7 @@ import java.io.IOException;
*
* <p>Methods of this interface act as a worker role in task manager.
*/
+@Internal
public interface CheckpointStorageWorkerView {
/**
@@ -64,4 +67,12 @@ public interface CheckpointStorageWorkerView {
* @throws IOException Thrown, if the stream cannot be opened.
*/
CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;
+
+ /**
+ * A complementary method to {@link #createTaskOwnedStateStream()}. Creates a toolset that gives
+ * access to additional operations that can be performed in the task owned state location.
+ *
+ * @return A toolset for additional operations for state owned by tasks.
+ */
+ CheckpointStateToolset createTaskOwnedCheckpointStateToolset();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
index c905438..96c9599 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;
import java.io.IOException;
+import java.util.List;
/**
* A factory for checkpoint output streams, which are used to persist data for checkpoints.
@@ -39,4 +40,31 @@ public interface CheckpointStreamFactory {
*/
CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope)
throws IOException;
+
+ /**
+ * Tells if we can duplicate the given {@link StreamStateHandle} into the path corresponding to
+ * the given {@link CheckpointedStateScope}.
+ *
+ * <p>This should be a rather cheap operation, preferably not involving any remote accesses.
+ *
+ * @param stateHandle The handle to duplicate
+ * @param scope Scope determining the location to duplicate into
+ * @return true, if we can perform the duplication
+ */
+ boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope)
+ throws IOException;
+
+ /**
+ * Duplicates {@link StreamStateHandle} into the path corresponding to * the given {@link
+ * CheckpointedStateScope}.
+ *
+ * <p>You should first check if you can duplicate with {@link
+ * #canFastDuplicate(StreamStateHandle, CheckpointedStateScope)}.
+ *
+ * @param stateHandles The handles to duplicate
+ * @param scope Scope determining the location to duplicate into
+ * @return The duplicated handle
+ */
+ List<StreamStateHandle> duplicate(
+ List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NotDuplicatingCheckpointStateToolset.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NotDuplicatingCheckpointStateToolset.java
new file mode 100644
index 0000000..354c821
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NotDuplicatingCheckpointStateToolset.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+/** An empty implementation of {@link CheckpointStateToolset}. */
+public final class NotDuplicatingCheckpointStateToolset implements CheckpointStateToolset {
+ @Override
+ public boolean canFastDuplicate(StreamStateHandle stateHandle) throws IOException {
+ return false;
+ }
+
+ @Override
+ public List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandle)
+ throws IOException {
+ throw new UnsupportedEncodingException("The toolset does not support duplication");
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
new file mode 100644
index 0000000..1f8423c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.DuplicatingFileSystem;
+import org.apache.flink.core.fs.DuplicatingFileSystem.CopyRequest;
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * An implementation of {@link CheckpointStateToolset} that does file based duplicating with as
+ * {@link DuplicatingFileSystem}.
+ */
+public class FsCheckpointStateToolset implements CheckpointStateToolset {
+
+ private final Path basePath;
+ private final DuplicatingFileSystem fs;
+
+ public FsCheckpointStateToolset(Path basePath, DuplicatingFileSystem fs) {
+ this.basePath = basePath;
+ this.fs = fs;
+ }
+
+ @Override
+ public boolean canFastDuplicate(StreamStateHandle stateHandle) throws IOException {
+ if (!(stateHandle instanceof FileStateHandle)) {
+ return false;
+ }
+ final Path srcPath = ((FileStateHandle) stateHandle).getFilePath();
+ final Path dst = getNewDstPath(srcPath.getName());
+ return fs.canFastDuplicate(srcPath, dst);
+ }
+
+ @Override
+ public List<StreamStateHandle> duplicate(List<StreamStateHandle> stateHandles)
+ throws IOException {
+
+ final List<CopyRequest> requests = new ArrayList<>();
+ for (StreamStateHandle handle : stateHandles) {
+ if (!(handle instanceof FileStateHandle)) {
+ throw new IllegalArgumentException("We can duplicate only FileStateHandles.");
+ }
+ final Path srcPath = ((FileStateHandle) handle).getFilePath();
+ requests.add(CopyRequest.of(srcPath, getNewDstPath(srcPath.getName())));
+ }
+ fs.duplicate(requests);
+
+ return IntStream.range(0, stateHandles.size())
+ .mapToObj(
+ idx -> {
+ final StreamStateHandle originalHandle = stateHandles.get(idx);
+ final Path dst = requests.get(idx).getDestination();
+ if (originalHandle instanceof RelativeFileStateHandle) {
+ return new RelativeFileStateHandle(
+ dst, dst.getName(), originalHandle.getStateSize());
+ } else {
+ return new FileStateHandle(dst, originalHandle.getStateSize());
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ private Path getNewDstPath(String fileName) throws IOException {
+ final Path dst = new Path(basePath, fileName);
+ return EntropyInjector.addEntropy(dst.getFileSystem(), dst);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
index 1c3ebb2..af37324 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
@@ -20,12 +20,15 @@ package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.DuplicatingFileSystem;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.NotDuplicatingCheckpointStateToolset;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
import javax.annotation.Nullable;
@@ -177,6 +180,16 @@ public class FsCheckpointStorageAccess extends AbstractFsCheckpointStorageAccess
}
@Override
+ public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+ if (fileSystem instanceof DuplicatingFileSystem) {
+ return new FsCheckpointStateToolset(
+ taskOwnedStateDirectory, (DuplicatingFileSystem) fileSystem);
+ } else {
+ return new NotDuplicatingCheckpointStateToolset();
+ }
+ }
+
+ @Override
protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) {
final CheckpointStorageLocationReference reference = encodePathAsReference(location);
return new FsCheckpointStorageLocation(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 5151130..8774848 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.filesystem;
+import org.apache.flink.core.fs.DuplicatingFileSystem;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
@@ -37,6 +38,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import java.util.UUID;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -87,6 +89,10 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
/** Whether the file system dynamically injects entropy into the file paths. */
private final boolean entropyInjecting;
+ private final FsCheckpointStateToolset privateStateToolset;
+
+ private final FsCheckpointStateToolset sharedStateToolset;
+
/**
* Creates a new stream factory that stores its checkpoint data in the file system and location
* defined by the given Path.
@@ -130,6 +136,16 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
this.fileStateThreshold = fileStateSizeThreshold;
this.writeBufferSize = writeBufferSize;
this.entropyInjecting = EntropyInjector.isEntropyInjecting(fileSystem);
+ if (fileSystem instanceof DuplicatingFileSystem) {
+ final DuplicatingFileSystem duplicatingFileSystem = (DuplicatingFileSystem) fileSystem;
+ this.privateStateToolset =
+ new FsCheckpointStateToolset(checkpointDirectory, duplicatingFileSystem);
+ this.sharedStateToolset =
+ new FsCheckpointStateToolset(sharedStateDirectory, duplicatingFileSystem);
+ } else {
+ this.privateStateToolset = null;
+ this.sharedStateToolset = null;
+ }
}
// ------------------------------------------------------------------------
@@ -137,10 +153,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
@Override
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(
CheckpointedStateScope scope) throws IOException {
- Path target =
- scope == CheckpointedStateScope.EXCLUSIVE
- ? checkpointDirectory
- : sharedStateDirectory;
+ Path target = getTargetPath(scope);
int bufferSize = Math.max(writeBufferSize, fileStateThreshold);
final boolean absolutePath = entropyInjecting || scope == CheckpointedStateScope.SHARED;
@@ -148,6 +161,45 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
target, filesystem, bufferSize, fileStateThreshold, !absolutePath);
}
+ private Path getTargetPath(CheckpointedStateScope scope) {
+ return scope == CheckpointedStateScope.EXCLUSIVE
+ ? checkpointDirectory
+ : sharedStateDirectory;
+ }
+
+ @Override
+ public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope)
+ throws IOException {
+ if (privateStateToolset == null || sharedStateToolset == null) {
+ return false;
+ }
+ switch (scope) {
+ case EXCLUSIVE:
+ return privateStateToolset.canFastDuplicate(stateHandle);
+ case SHARED:
+ return sharedStateToolset.canFastDuplicate(stateHandle);
+ }
+ return false;
+ }
+
+ @Override
+ public List<StreamStateHandle> duplicate(
+ List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException {
+
+ if (privateStateToolset == null || sharedStateToolset == null) {
+ throw new IllegalArgumentException("The underlying FS does not support duplication.");
+ }
+
+ switch (scope) {
+ case EXCLUSIVE:
+ return privateStateToolset.duplicate(stateHandles);
+ case SHARED:
+ return sharedStateToolset.duplicate(stateHandles);
+ default:
+ throw new IllegalArgumentException("Unknown state scope: " + scope);
+ }
+ }
+
// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 98d09be..c9b5088 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -28,13 +28,14 @@ import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
/** {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays. */
public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
- /** The maximal size that the snapshotted memory state may have */
+ /** The maximal size that the snapshotted memory state may have. */
private final int maxStateSize;
/**
@@ -54,6 +55,17 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
}
@Override
+ public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) {
+ return false;
+ }
+
+ @Override
+ public List<StreamStateHandle> duplicate(
+ List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException {
+ throw new UnsupportedOperationException("We can not duplicate handles in memory.");
+ }
+
+ @Override
public String toString() {
return "In-Memory Stream Factory";
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
index 10e33a9..46ae6c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
@@ -23,9 +23,11 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.NotDuplicatingCheckpointStateToolset;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
@@ -157,6 +159,11 @@ public class MemoryBackendCheckpointStorageAccess extends AbstractFsCheckpointSt
}
@Override
+ public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+ return new NotDuplicatingCheckpointStateToolset();
+ }
+
+ @Override
protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) {
return new PersistentMetadataCheckpointStorageLocation(fs, location, maxStateSize);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
index 8363e2e..f9cc467 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
@@ -271,6 +271,11 @@ public class ChannelPersistenceITCase {
public CheckpointStateOutputStream createTaskOwnedStateStream() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+ throw new UnsupportedOperationException();
+ }
};
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
index 9b8d298..afe955d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
@@ -53,4 +53,9 @@ public class TestCheckpointStorageWorkerView implements CheckpointStorageWorkerV
return taskOwnedCheckpointStreamFactory.createCheckpointStateOutputStream(
taskOwnedStateScope);
}
+
+ @Override
+ public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+ return new NotDuplicatingCheckpointStateToolset();
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
index 9e05949..9213a51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
@@ -97,6 +97,11 @@ public class TestingCheckpointStorageAccessCoordinatorView
return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(Integer.MAX_VALUE);
}
+ @Override
+ public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+ return new NotDuplicatingCheckpointStateToolset();
+ }
+
// ------------------------------------------------------------------------
// Utils
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java
new file mode 100644
index 0000000..d9dafe5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.DuplicatingFileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FsCheckpointStateToolset}. */
+class FsCheckpointStateToolsetTest {
+ @Test
+ void testCanDuplicateNonFileStreamHandle() throws IOException {
+ final FsCheckpointStateToolset stateToolset =
+ new FsCheckpointStateToolset(
+ new Path("test-path"), new TestDuplicatingFileSystem());
+
+ final boolean canFastDuplicate =
+ stateToolset.canFastDuplicate(new ByteStreamStateHandle("test", new byte[] {}));
+ assertThat(canFastDuplicate).isFalse();
+ }
+
+ @Test
+ void testCanDuplicate() throws IOException {
+ final FsCheckpointStateToolset stateToolset =
+ new FsCheckpointStateToolset(
+ new Path("test-path"), new TestDuplicatingFileSystem());
+
+ final boolean canFastDuplicate =
+ stateToolset.canFastDuplicate(
+ new FileStateHandle(new Path("old-test-path", "test-file"), 0));
+ assertThat(canFastDuplicate).isTrue();
+ }
+
+ @Test
+ void testCannotDuplicate() throws IOException {
+ final FsCheckpointStateToolset stateToolset =
+ new FsCheckpointStateToolset(
+ new Path("test-path"), new TestDuplicatingFileSystem());
+
+ final boolean canFastDuplicate =
+ stateToolset.canFastDuplicate(
+ new FileStateHandle(new Path("test-path", "test-file"), 0));
+ assertThat(canFastDuplicate).isFalse();
+ }
+
+ @Test
+ void testDuplicating() throws IOException {
+ final TestDuplicatingFileSystem fs = new TestDuplicatingFileSystem();
+ final FsCheckpointStateToolset stateToolset =
+ new FsCheckpointStateToolset(new Path("test-path"), fs);
+
+ final List<StreamStateHandle> duplicated =
+ stateToolset.duplicate(
+ Arrays.asList(
+ new FileStateHandle(new Path("old-test-path", "test-file1"), 0),
+ new FileStateHandle(new Path("old-test-path", "test-file2"), 0),
+ new RelativeFileStateHandle(
+ new Path("old-test-path", "test-file3"), "test-file3", 0)));
+
+ assertThat(duplicated)
+ .containsExactly(
+ new FileStateHandle(new Path("test-path", "test-file1"), 0),
+ new FileStateHandle(new Path("test-path", "test-file2"), 0),
+ new RelativeFileStateHandle(
+ new Path("test-path", "test-file3"), "test-file3", 0));
+ }
+
+ private static final class TestDuplicatingFileSystem implements DuplicatingFileSystem {
+
+ @Override
+ public boolean canFastDuplicate(Path source, Path destination) throws IOException {
+ return !source.equals(destination);
+ }
+
+ @Override
+ public void duplicate(List<CopyRequest> requests) throws IOException {}
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
index 7b24f3d..2c2fbdc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.DuplicatingFileSystem;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
@@ -26,19 +27,24 @@ import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.NotDuplicatingCheckpointStateToolset;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
+import org.apache.flink.testutils.TestFileSystem;
import org.junit.Test;
import javax.annotation.Nonnull;
import java.io.File;
+import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.List;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -280,6 +286,42 @@ public class FsCheckpointStorageAccessTest extends AbstractFileCheckpointStorage
assertTrue(fileSystem instanceof LocalFileSystem);
}
+ @Test
+ public void testNotDuplicationCheckpointStateToolset() throws Exception {
+ CheckpointStorageAccess checkpointStorage = createCheckpointStorage(randomTempPath());
+ assertThat(
+ checkpointStorage.createTaskOwnedCheckpointStateToolset(),
+ instanceOf(NotDuplicatingCheckpointStateToolset.class));
+ }
+
+ @Test
+ public void testDuplicationCheckpointStateToolset() throws Exception {
+ CheckpointStorageAccess checkpointStorage =
+ new FsCheckpointStorageAccess(
+ new TestDuplicatingFileSystem(),
+ randomTempPath(),
+ null,
+ new JobID(),
+ FILE_SIZE_THRESHOLD,
+ WRITE_BUFFER_SIZE);
+
+ assertThat(
+ checkpointStorage.createTaskOwnedCheckpointStateToolset(),
+ instanceOf(FsCheckpointStateToolset.class));
+ }
+
+ private static final class TestDuplicatingFileSystem extends TestFileSystem
+ implements DuplicatingFileSystem {
+
+ @Override
+ public boolean canFastDuplicate(Path source, Path destination) throws IOException {
+ return !source.equals(destination);
+ }
+
+ @Override
+ public void duplicate(List<CopyRequest> requests) throws IOException {}
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
index 0331d5b..5ad7903 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
@@ -21,18 +21,21 @@ package org.apache.flink.runtime.state.testutils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.function.SupplierWithException;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -114,6 +117,11 @@ public class BackendForTestStream extends MemoryStateBackend {
public CheckpointStateOutputStream createTaskOwnedStateStream() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+ throw new UnsupportedOperationException();
+ }
}
private static final class TestFactory
@@ -130,5 +138,18 @@ public class BackendForTestStream extends MemoryStateBackend {
CheckpointedStateScope scope) throws IOException {
return streamFactory.get();
}
+
+ @Override
+ public boolean canFastDuplicate(
+ StreamStateHandle stateHandle, CheckpointedStateScope scope) {
+ return false;
+ }
+
+ @Override
+ public List<StreamStateHandle> duplicate(
+ List<StreamStateHandle> stateHandles, CheckpointedStateScope scope)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
index 80f36b2..0f16c73 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
@@ -21,7 +21,10 @@ package org.apache.flink.runtime.state.testutils;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import java.io.IOException;
+import java.util.List;
import java.util.function.Supplier;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -42,4 +45,15 @@ public class TestCheckpointStreamFactory implements CheckpointStreamFactory {
CheckpointedStateScope scope) {
return supplier.get();
}
+
+ @Override
+ public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) {
+ return false;
+ }
+
+ @Override
+ public List<StreamStateHandle> duplicate(
+ List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
index fb4a8a0..91e78f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.ttl.mock;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
@@ -28,9 +29,13 @@ import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.StreamStateHandle;
import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+
public class MockCheckpointStorage implements CheckpointStorage {
@Override
public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) {
@@ -69,6 +74,19 @@ public class MockCheckpointStorage implements CheckpointStorage {
}
@Override
+ public boolean canFastDuplicate(
+ StreamStateHandle stateHandle, CheckpointedStateScope scope) {
+ return false;
+ }
+
+ @Override
+ public List<StreamStateHandle> duplicate(
+ List<StreamStateHandle> stateHandles, CheckpointedStateScope scope)
+ throws IOException {
+ return null;
+ }
+
+ @Override
public CheckpointMetadataOutputStream createMetadataOutputStream() {
return null;
}
@@ -99,6 +117,11 @@ public class MockCheckpointStorage implements CheckpointStorage {
public CheckpointStateOutputStream createTaskOwnedStateStream() {
return null;
}
+
+ @Override
+ public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+ return null;
+ }
};
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index 021e340..8f16b5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -24,10 +24,12 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import java.io.IOException;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
/** {@link CheckpointStreamFactory} for tests that allows for testing cancellation in async IO. */
@@ -86,4 +88,15 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
return blockingStream;
}
+
+ @Override
+ public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) {
+ return false;
+ }
+
+ @Override
+ public List<StreamStateHandle> duplicate(
+ List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
index 3422850..8986a5c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators.sorted.state;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -73,4 +74,10 @@ class NonCheckpointingStorageAccess implements CheckpointStorageAccess {
throw new UnsupportedOperationException(
"Checkpoints are not supported in a single key state backend");
}
+
+ @Override
+ public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+ throw new UnsupportedOperationException(
+ "Checkpoints are not supported in a single key state backend");
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index bcb4bdd..7c96d49 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -717,6 +718,11 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
return delegate.createTaskOwnedStateStream();
}
+
+ @Override
+ public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+ return delegate.createTaskOwnedCheckpointStateToolset();
+ }
}
private static void logCheckpointProcessingDelay(CheckpointMetaData checkpointMetaData) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
index ef7fc8a..81a1b78 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
@@ -24,11 +24,13 @@ import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
@@ -51,7 +53,9 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.time.Duration;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -224,7 +228,26 @@ public class UnalignedCheckpointFailureHandlingITCase {
@Override
public CheckpointStreamFactory resolveCheckpointStorageLocation(
long checkpointId, CheckpointStorageLocationReference reference) {
- return ign -> new FailingOnceFsCheckpointOutputStream(path, 100, 0, failOnClose);
+ return new CheckpointStreamFactory() {
+ @Override
+ public CheckpointStateOutputStream createCheckpointStateOutputStream(
+ CheckpointedStateScope scope) throws IOException {
+ return new FailingOnceFsCheckpointOutputStream(path, 100, 0, failOnClose);
+ }
+
+ @Override
+ public boolean canFastDuplicate(
+ StreamStateHandle stateHandle, CheckpointedStateScope scope) {
+ return false;
+ }
+
+ @Override
+ public List<StreamStateHandle> duplicate(
+ List<StreamStateHandle> stateHandles, CheckpointedStateScope scope)
+ throws IOException {
+ throw new UnsupportedEncodingException();
+ }
+ };
}
@Override
@@ -233,6 +256,11 @@ public class UnalignedCheckpointFailureHandlingITCase {
}
@Override
+ public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
+ return delegate.createTaskOwnedCheckpointStateToolset();
+ }
+
+ @Override
public boolean supportsHighlyAvailableStorage() {
return delegate.supportsHighlyAvailableStorage();
}