You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/01/19 07:21:55 UTC
[flink] 01/02: [refactor] Make CheckpointStateOutputStream a top level class
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 72b0e9f2b0ab8478268a26845719791a7f25834c
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jan 12 12:30:05 2022 +0100
[refactor] Make CheckpointStateOutputStream a top level class
---
.../channel/ChannelStateCheckpointWriter.java | 2 +-
.../checkpoint/channel/ChannelStateWriterImpl.java | 7 +--
.../state/CheckpointMetadataOutputStream.java | 4 +-
.../runtime/state/CheckpointStateOutputStream.java | 72 ++++++++++++++++++++++
.../runtime/state/CheckpointStorageWorkerView.java | 3 +-
.../runtime/state/CheckpointStreamFactory.java | 50 ---------------
.../state/CheckpointStreamWithResultProvider.java | 19 +++---
...efaultOperatorStateBackendSnapshotStrategy.java | 2 +-
.../state/DuplicatingCheckpointOutputStream.java | 19 +++---
.../runtime/state/FullSnapshotAsyncWriter.java | 2 +-
.../state/KeyedStateCheckpointOutputStream.java | 3 +-
.../state/NonClosingCheckpointOutputStream.java | 7 +--
.../state/OperatorStateCheckpointOutputStream.java | 4 +-
.../runtime/state/SavepointSnapshotStrategy.java | 2 +-
.../state/StateSnapshotContextSynchronousImpl.java | 7 +--
.../filesystem/FileBasedStateOutputStream.java | 2 +-
.../filesystem/FsCheckpointStorageAccess.java | 2 +-
.../filesystem/FsCheckpointStreamFactory.java | 8 +--
.../runtime/state/heap/HeapSnapshotStrategy.java | 3 +-
.../state/memory/MemCheckpointStreamFactory.java | 1 +
.../MemoryBackendCheckpointStorageAccess.java | 2 +-
.../channel/ChannelStateCheckpointWriterTest.java | 2 +-
.../runtime/state/ChannelPersistenceITCase.java | 3 +-
.../CheckpointStreamWithResultProviderTest.java | 2 +-
.../DuplicatingCheckpointOutputStreamTest.java | 3 +-
.../KeyedStateCheckpointOutputStreamTest.java | 2 +-
.../OperatorStateOutputCheckpointStreamTest.java | 2 +-
.../state/TestCheckpointStorageWorkerView.java | 3 +-
...tingCheckpointStorageAccessCoordinatorView.java | 2 +-
.../FsCheckpointStateOutputStreamTest.java | 17 +++--
.../filesystem/FsCheckpointStorageAccessTest.java | 2 +-
.../filesystem/FsStateBackendEntropyTest.java | 2 +-
.../memory/MemoryCheckpointOutputStreamTest.java | 2 +-
.../memory/MemoryCheckpointStorageAccessTest.java | 2 +-
.../state/testutils/BackendForTestStream.java | 2 +-
.../testutils/TestCheckpointStreamFactory.java | 1 +
.../state/ttl/mock/MockCheckpointStorage.java | 4 +-
.../util/BlockerCheckpointStreamFactory.java | 1 +
.../util/BlockingCheckpointOutputStream.java | 10 ++-
.../changelog/ChangelogKeyedStateBackend.java | 27 +++++++-
.../changelog/ChangelogStateBackendTestUtils.java | 10 ++-
.../streaming/state/RocksDBStateUploader.java | 3 +-
.../streaming/state/RocksDBAsyncSnapshotTest.java | 2 +-
.../streaming/state/RocksDBStateUploaderTest.java | 32 ++++++++--
.../state/NonCheckpointingStorageAccess.java | 3 +-
.../runtime/operators/GenericWriteAheadSink.java | 4 +-
.../tasks/SubtaskCheckpointCoordinatorImpl.java | 4 +-
.../StateSnapshotContextSynchronousImplTest.java | 13 ++--
.../tasks/TaskCheckpointingBehaviourTest.java | 2 +-
.../UnalignedCheckpointFailureHandlingITCase.java | 4 +-
50 files changed, 227 insertions(+), 160 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
index f84cf50..8b3a9f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
@@ -23,8 +23,8 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
import org.apache.flink.runtime.state.AbstractChannelStateHandle.StateContentMetaInfo;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 456bcab..ef8f58b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.checkpoint.channel;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
@@ -39,9 +39,8 @@ import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteReque
import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.write;
/**
- * {@link ChannelStateWriter} implemented using {@link
- * CheckpointStreamFactory.CheckpointStateOutputStream CheckpointStateOutputStreams}. Internally, it
- * has by default
+ * {@link ChannelStateWriter} implemented using {@link CheckpointStateOutputStream
+ * CheckpointStateOutputStreams}. Internally, it has by default
*
* <ul>
* <li>one stream per checkpoint; having multiple streams would mean more files written and more
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java
index 418107b..525e21b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java
@@ -25,8 +25,8 @@ import java.io.IOException;
/**
* An output stream for checkpoint metadata.
*
- * <p>This stream is similar to the {@link CheckpointStreamFactory.CheckpointStateOutputStream}, but
- * for metadata files rather thancdata files.
+ * <p>This stream is similar to the {@link CheckpointStateOutputStream}, but for metadata files
+ * rather thancdata files.
*
* <p>This stream always creates a file, regardless of the amount of data written.
*/
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateOutputStream.java
new file mode 100644
index 0000000..4858235
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateOutputStream.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A dedicated output stream that produces a {@link StreamStateHandle} when closed.
+ *
+ * <p><b>Important:</b> When closing this stream after the successful case, you must call {@link
+ * #closeAndGetHandle()} - only that method will actually retain the resource written to. The method
+ * has the semantics of "close on success". The {@link #close()} method is supposed to remove the
+ * target resource if called before {@link #closeAndGetHandle()}, hence having the semantics of
+ * "close on failure". That way, simple try-with-resources statements automatically clean up
+ * unsuccessful partial state resources in case the writing does not complete.
+ *
+ * <p>Note: This is an abstract class and not an interface because {@link OutputStream} is an
+ * abstract class.
+ */
+@Internal
+public abstract class CheckpointStateOutputStream extends FSDataOutputStream {
+ /**
+ * Closes the stream and gets a state handle that can create an input stream producing the data
+ * written to this stream.
+ *
+ * <p>This closing must be called (also when the caller is not interested in the handle) to
+ * successfully close the stream and retain the produced resource. In contrast, the {@link
+ * #close()} method removes the target resource when called.
+ *
+ * @return A state handle that can create an input stream producing the data written to this
+ * stream.
+ * @throws IOException Thrown, if the stream cannot be closed.
+ */
+ @Nullable
+ public abstract StreamStateHandle closeAndGetHandle() throws IOException;
+
+ /**
+ * This method should close the stream, if has not been closed before. If this method actually
+ * closes the stream, it should delete/release the resource behind the stream, such as the file
+ * that the stream writes to.
+ *
+ * <p>The above implies that this method is intended to be the "unsuccessful close", such as
+ * when cancelling the stream writing, or when an exception occurs. Closing the stream for the
+ * successful case must go through {@link #closeAndGetHandle()}.
+ *
+ * @throws IOException Thrown, if the stream cannot be closed.
+ */
+ @Override
+ public abstract void close() throws IOException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
index 064e42b..39d4a74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
@@ -63,6 +63,5 @@ public interface CheckpointStorageWorkerView {
* @return A checkpoint state stream to the location for state owned by tasks.
* @throws IOException Thrown, if the stream cannot be opened.
*/
- CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream()
- throws IOException;
+ CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
index b864711..c905438 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
@@ -18,12 +18,7 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.core.fs.FSDataOutputStream;
-
-import javax.annotation.Nullable;
-
import java.io.IOException;
-import java.io.OutputStream;
/**
* A factory for checkpoint output streams, which are used to persist data for checkpoints.
@@ -44,49 +39,4 @@ public interface CheckpointStreamFactory {
*/
CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope)
throws IOException;
-
- /**
- * A dedicated output stream that produces a {@link StreamStateHandle} when closed.
- *
- * <p><b>Important:</b> When closing this stream after the successful case, you must call {@link
- * #closeAndGetHandle()} - only that method will actually retain the resource written to. The
- * method has the semantics of "close on success". The {@link #close()} method is supposed to
- * remove the target resource if called before {@link #closeAndGetHandle()}, hence having the
- * semantics of "close on failure". That way, simple try-with-resources statements automatically
- * clean up unsuccessful partial state resources in case the writing does not complete.
- *
- * <p>Note: This is an abstract class and not an interface because {@link OutputStream} is an
- * abstract class.
- */
- abstract class CheckpointStateOutputStream extends FSDataOutputStream {
-
- /**
- * Closes the stream and gets a state handle that can create an input stream producing the
- * data written to this stream.
- *
- * <p>This closing must be called (also when the caller is not interested in the handle) to
- * successfully close the stream and retain the produced resource. In contrast, the {@link
- * #close()} method removes the target resource when called.
- *
- * @return A state handle that can create an input stream producing the data written to this
- * stream.
- * @throws IOException Thrown, if the stream cannot be closed.
- */
- @Nullable
- public abstract StreamStateHandle closeAndGetHandle() throws IOException;
-
- /**
- * This method should close the stream, if has not been closed before. If this method
- * actually closes the stream, it should delete/release the resource behind the stream, such
- * as the file that the stream writes to.
- *
- * <p>The above implies that this method is intended to be the "unsuccessful close", such as
- * when cancelling the stream writing, or when an exception occurs. Closing the stream for
- * the successful case must go through {@link #closeAndGetHandle()}.
- *
- * @throws IOException Thrown, if the stream cannot be closed.
- */
- @Override
- public abstract void close() throws IOException;
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
index cd7b181..3378ccb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
@@ -48,7 +48,7 @@ public interface CheckpointStreamWithResultProvider extends Closeable {
/** Returns the encapsulated output stream. */
@Nonnull
- CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream();
+ CheckpointStateOutputStream getCheckpointOutputStream();
@Override
default void close() throws IOException {
@@ -61,10 +61,9 @@ public interface CheckpointStreamWithResultProvider extends Closeable {
*/
class PrimaryStreamOnly implements CheckpointStreamWithResultProvider {
- @Nonnull private final CheckpointStreamFactory.CheckpointStateOutputStream outputStream;
+ @Nonnull private final CheckpointStateOutputStream outputStream;
- public PrimaryStreamOnly(
- @Nonnull CheckpointStreamFactory.CheckpointStateOutputStream outputStream) {
+ public PrimaryStreamOnly(@Nonnull CheckpointStateOutputStream outputStream) {
this.outputStream = outputStream;
}
@@ -77,7 +76,7 @@ public interface CheckpointStreamWithResultProvider extends Closeable {
@Nonnull
@Override
- public CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream() {
+ public CheckpointStateOutputStream getCheckpointOutputStream() {
return outputStream;
}
}
@@ -93,8 +92,8 @@ public interface CheckpointStreamWithResultProvider extends Closeable {
@Nonnull private final DuplicatingCheckpointOutputStream outputStream;
public PrimaryAndSecondaryStream(
- @Nonnull CheckpointStreamFactory.CheckpointStateOutputStream primaryOut,
- CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut)
+ @Nonnull CheckpointStateOutputStream primaryOut,
+ CheckpointStateOutputStream secondaryOut)
throws IOException {
this(new DuplicatingCheckpointOutputStream(primaryOut, secondaryOut));
}
@@ -154,7 +153,7 @@ public interface CheckpointStreamWithResultProvider extends Closeable {
@Nonnull CheckpointStreamFactory primaryStreamFactory)
throws IOException {
- CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
+ CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
@@ -168,7 +167,7 @@ public interface CheckpointStreamWithResultProvider extends Closeable {
@Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider)
throws IOException {
- CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
+ CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
try {
@@ -179,7 +178,7 @@ public interface CheckpointStreamWithResultProvider extends Closeable {
String.valueOf(UUID.randomUUID()));
Path outPath = new Path(outFile.toURI());
- CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =
+ CheckpointStateOutputStream secondaryOut =
new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);
return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
index 6055836..6dc6739 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
@@ -117,7 +117,7 @@ class DefaultOperatorStateBackendSnapshotStrategy
}
return (snapshotCloseableRegistry) -> {
- CheckpointStreamFactory.CheckpointStateOutputStream localOut =
+ CheckpointStateOutputStream localOut =
streamFactory.createCheckpointStateOutputStream(
CheckpointedStateScope.EXCLUSIVE);
snapshotCloseableRegistry.registerCloseable(localOut);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
index 5535aeb..fd9e7e32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
@@ -35,8 +35,7 @@ import java.io.IOException;
* user. This class is used to write state for local recovery as a local (secondary) copy of the
* (primary) state snapshot that is written to a (slower but highly-available) remote filesystem.
*/
-public class DuplicatingCheckpointOutputStream
- extends CheckpointStreamFactory.CheckpointStateOutputStream {
+public class DuplicatingCheckpointOutputStream extends CheckpointStateOutputStream {
/** Default buffer size of 8KB. */
private static final int DEFAULT_BUFFER_SIZER = 8 * 1024;
@@ -48,13 +47,13 @@ public class DuplicatingCheckpointOutputStream
private int bufferIdx;
/** Primary stream for writing the checkpoint data. Failures from this stream are forwarded. */
- private final CheckpointStreamFactory.CheckpointStateOutputStream primaryOutputStream;
+ private final CheckpointStateOutputStream primaryOutputStream;
/**
* Primary stream for writing the checkpoint data. Failures from this stream are not forwarded
* until {@link #closeAndGetSecondaryHandle()}.
*/
- private final CheckpointStreamFactory.CheckpointStateOutputStream secondaryOutputStream;
+ private final CheckpointStateOutputStream secondaryOutputStream;
/**
* Stores a potential exception that occurred while interacting with {@link
@@ -63,15 +62,15 @@ public class DuplicatingCheckpointOutputStream
private Exception secondaryStreamException;
public DuplicatingCheckpointOutputStream(
- CheckpointStreamFactory.CheckpointStateOutputStream primaryOutputStream,
- CheckpointStreamFactory.CheckpointStateOutputStream secondaryOutputStream)
+ CheckpointStateOutputStream primaryOutputStream,
+ CheckpointStateOutputStream secondaryOutputStream)
throws IOException {
this(primaryOutputStream, secondaryOutputStream, DEFAULT_BUFFER_SIZER);
}
public DuplicatingCheckpointOutputStream(
- CheckpointStreamFactory.CheckpointStateOutputStream primaryOutputStream,
- CheckpointStreamFactory.CheckpointStateOutputStream secondaryOutputStream,
+ CheckpointStateOutputStream primaryOutputStream,
+ CheckpointStateOutputStream secondaryOutputStream,
int bufferSize)
throws IOException {
@@ -280,12 +279,12 @@ public class DuplicatingCheckpointOutputStream
}
@VisibleForTesting
- CheckpointStreamFactory.CheckpointStateOutputStream getPrimaryOutputStream() {
+ CheckpointStateOutputStream getPrimaryOutputStream() {
return primaryOutputStream;
}
@VisibleForTesting
- CheckpointStreamFactory.CheckpointStateOutputStream getSecondaryOutputStream() {
+ CheckpointStateOutputStream getSecondaryOutputStream() {
return secondaryOutputStream;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
index c5a7277..96a3bd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
@@ -136,7 +136,7 @@ public class FullSnapshotAsyncWriter<K>
byte[] previousValue = null;
DataOutputView kgOutView = null;
OutputStream kgOutStream = null;
- CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream =
+ CheckpointStateOutputStream checkpointOutputStream =
checkpointStreamWithResultProvider.getCheckpointOutputStream();
try {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java
index 7384fbe..a487e4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java
@@ -39,8 +39,7 @@ public final class KeyedStateCheckpointOutputStream
private final KeyGroupRangeOffsets keyGroupRangeOffsets;
public KeyedStateCheckpointOutputStream(
- CheckpointStreamFactory.CheckpointStateOutputStream delegate,
- KeyGroupRange keyGroupRange) {
+ CheckpointStateOutputStream delegate, KeyGroupRange keyGroupRange) {
super(delegate);
Preconditions.checkNotNull(keyGroupRange);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java
index d39b198..76530b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java
@@ -34,11 +34,10 @@ import java.io.OutputStream;
public abstract class NonClosingCheckpointOutputStream<T extends StreamStateHandle>
extends OutputStream {
- protected final CheckpointStreamFactory.CheckpointStateOutputStream delegate;
+ protected final CheckpointStateOutputStream delegate;
private final ResourceGuard resourceGuard = new ResourceGuard();
- public NonClosingCheckpointOutputStream(
- CheckpointStreamFactory.CheckpointStateOutputStream delegate) {
+ public NonClosingCheckpointOutputStream(CheckpointStateOutputStream delegate) {
this.delegate = Preconditions.checkNotNull(delegate);
}
@@ -79,7 +78,7 @@ public abstract class NonClosingCheckpointOutputStream<T extends StreamStateHand
}
/** This method should not be public so as to not expose internals to user code. */
- CheckpointStreamFactory.CheckpointStateOutputStream getDelegate() {
+ CheckpointStateOutputStream getDelegate() {
return delegate;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
index 34d700a..5f46187 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
@@ -33,8 +33,8 @@ public final class OperatorStateCheckpointOutputStream
private LongArrayList partitionOffsets;
private final long initialPosition;
- public OperatorStateCheckpointOutputStream(
- CheckpointStreamFactory.CheckpointStateOutputStream delegate) throws IOException {
+ public OperatorStateCheckpointOutputStream(CheckpointStateOutputStream delegate)
+ throws IOException {
super(delegate);
this.partitionOffsets = new LongArrayList(16);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointSnapshotStrategy.java
index 126150b..114bf97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointSnapshotStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointSnapshotStrategy.java
@@ -85,7 +85,7 @@ public class SavepointSnapshotStrategy<K>
static CheckpointStreamWithResultProvider createSimpleStream(
@Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException {
- CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
+ CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(
CheckpointedStateScope.EXCLUSIVE);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index a6b281b..4df7015 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -94,9 +94,8 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
return checkpointTimestamp;
}
- private CheckpointStreamFactory.CheckpointStateOutputStream openAndRegisterNewStream()
- throws Exception {
- CheckpointStreamFactory.CheckpointStateOutputStream cout =
+ private CheckpointStateOutputStream openAndRegisterNewStream() throws Exception {
+ CheckpointStateOutputStream cout =
streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
closableRegistry.registerCloseable(cout);
@@ -160,7 +159,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
Preconditions.checkNotNull(stream);
- CheckpointStreamFactory.CheckpointStateOutputStream delegate = stream.getDelegate();
+ CheckpointStateOutputStream delegate = stream.getDelegate();
if (closableRegistry.unregisterCloseable(delegate)) {
delegate.close();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
index 7021ea50..12f0ba0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
index a33c47b..1c3ebb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
@@ -22,10 +22,10 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
import javax.annotation.Nullable;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 8069bdd..5151130 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.OutputStreamAndPath;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -161,11 +162,10 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
// ------------------------------------------------------------------------
/**
- * A {@link CheckpointStreamFactory.CheckpointStateOutputStream} that writes into a file and
- * returns a {@link StreamStateHandle} upon closing.
+ * A {@link CheckpointStateOutputStream} that writes into a file and returns a {@link
+ * StreamStateHandle} upon closing.
*/
- public static class FsCheckpointStateOutputStream
- extends CheckpointStreamFactory.CheckpointStateOutputStream {
+ public static class FsCheckpointStateOutputStream extends CheckpointStateOutputStream {
private final byte[] writeBuffer;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
index 78b96e5..8b2bc3e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -140,7 +141,7 @@ class HeapSnapshotStrategy<K>
snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);
- final CheckpointStreamFactory.CheckpointStateOutputStream localStream =
+ final CheckpointStateOutputStream localStream =
streamWithResultProvider.getCheckpointOutputStream();
final DataOutputViewStreamWrapper outView =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 5f36a42..98d09be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.memory;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
index 5ff029d..10e33a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
@@ -22,10 +22,10 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
index 28b81d7..29cd71f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelSta
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
index 71fbdfd..8363e2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
@@ -268,8 +268,7 @@ public class ChannelPersistenceITCase {
}
@Override
- public CheckpointStreamFactory.CheckpointStateOutputStream
- createTaskOwnedStateStream() {
+ public CheckpointStateOutputStream createTaskOwnedStateStream() {
throw new UnsupportedOperationException();
}
};
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java
index 915c9bf..4e928c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java
@@ -183,7 +183,7 @@ public class CheckpointStreamWithResultProviderTest extends TestLogger {
private SnapshotResult<StreamStateHandle> writeCheckpointTestData(
CheckpointStreamWithResultProvider resultProvider) throws IOException {
- CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream =
+ CheckpointStateOutputStream checkpointOutputStream =
resultProvider.getCheckpointOutputStream();
checkpointOutputStream.write(0x42);
return resultProvider.closeAndFinalizeCheckpointStreamResult();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java
index 9fa30de..0b09655e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java
@@ -285,8 +285,7 @@ public class DuplicatingCheckpointOutputStreamTest extends TestLogger {
}
/** Stream that throws {@link IOException} on all relevant methods under test. */
- private static class FailingCheckpointOutStream
- extends CheckpointStreamFactory.CheckpointStateOutputStream {
+ private static class FailingCheckpointOutStream extends CheckpointStateOutputStream {
private boolean closed = false;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java
index a6114ba..31f46f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java
@@ -33,7 +33,7 @@ public class KeyedStateCheckpointOutputStreamTest {
private static final int STREAM_CAPACITY = 128;
private static KeyedStateCheckpointOutputStream createStream(KeyGroupRange keyGroupRange) {
- CheckpointStreamFactory.CheckpointStateOutputStream checkStream =
+ CheckpointStateOutputStream checkStream =
new TestMemoryCheckpointOutputStream(STREAM_CAPACITY);
return new KeyedStateCheckpointOutputStream(checkStream, keyGroupRange);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateOutputCheckpointStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateOutputCheckpointStreamTest.java
index d92c526..80af8e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateOutputCheckpointStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateOutputCheckpointStreamTest.java
@@ -35,7 +35,7 @@ public class OperatorStateOutputCheckpointStreamTest {
private static final int STREAM_CAPACITY = 128;
private static OperatorStateCheckpointOutputStream createStream() throws IOException {
- CheckpointStreamFactory.CheckpointStateOutputStream checkStream =
+ CheckpointStateOutputStream checkStream =
new TestMemoryCheckpointOutputStream(STREAM_CAPACITY);
return new OperatorStateCheckpointOutputStream(checkStream);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
index 95b0248..9b8d298 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
@@ -49,8 +49,7 @@ public class TestCheckpointStorageWorkerView implements CheckpointStorageWorkerV
}
@Override
- public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream()
- throws IOException {
+ public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
return taskOwnedCheckpointStreamFactory.createCheckpointStateOutputStream(
taskOwnedStateScope);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
index ff674b9..9e05949 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
@@ -93,7 +93,7 @@ public class TestingCheckpointStorageAccessCoordinatorView
}
@Override
- public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() {
+ public CheckpointStateOutputStream createTaskOwnedStateStream() {
return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(Integer.MAX_VALUE);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index bf2c264..472c35c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -22,8 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -90,7 +89,7 @@ public class FsCheckpointStateOutputStreamTest {
@Test
public void testEmptyState() throws Exception {
- FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
+ CheckpointStateOutputStream stream =
new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
Path.fromLocalFile(tempDir.newFolder()),
FileSystem.getLocalFileSystem(),
@@ -124,7 +123,7 @@ public class FsCheckpointStateOutputStreamTest {
@Test
public void testGetPos() throws Exception {
- FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
+ CheckpointStateOutputStream stream =
new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
Path.fromLocalFile(tempDir.newFolder()),
FileSystem.getLocalFileSystem(),
@@ -172,7 +171,7 @@ public class FsCheckpointStateOutputStreamTest {
when(fs.create(pathCaptor.capture(), any(FileSystem.WriteMode.class)))
.thenReturn(outputStream);
- CheckpointStreamFactory.CheckpointStateOutputStream stream =
+ CheckpointStateOutputStream stream =
new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
Path.fromLocalFile(tempDir.newFolder()), fs, 4, 0, relativePaths);
@@ -198,7 +197,7 @@ public class FsCheckpointStateOutputStreamTest {
.thenReturn(outputStream);
doThrow(new IOException("Test IOException.")).when(outputStream).close();
- CheckpointStreamFactory.CheckpointStateOutputStream stream =
+ CheckpointStateOutputStream stream =
new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
Path.fromLocalFile(tempDir.newFolder()), fs, 4, 0, relativePaths);
@@ -219,7 +218,7 @@ public class FsCheckpointStateOutputStreamTest {
private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile)
throws Exception {
- FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
+ CheckpointStateOutputStream stream =
new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
Path.fromLocalFile(tempDir.newFolder()),
FileSystem.getLocalFileSystem(),
@@ -335,13 +334,13 @@ public class FsCheckpointStateOutputStreamTest {
// use with try-with-resources
StreamStateHandle handle4;
- try (CheckpointStreamFactory.CheckpointStateOutputStream stream4 = factory.get()) {
+ try (CheckpointStateOutputStream stream4 = factory.get()) {
stream4.write(state4);
handle4 = stream4.closeAndGetHandle();
}
// close before accessing handle
- CheckpointStreamFactory.CheckpointStateOutputStream stream5 = factory.get();
+ CheckpointStateOutputStream stream5 = factory.get();
stream5.write(state4);
stream5.close();
try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
index 77cb9b6..7b24f3d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
@@ -22,9 +22,9 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
index a31a09c..8a4cf38 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.junit.Rule;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointOutputStreamTest.java
index 07d43cd..63dc7a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointOutputStreamTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.state.memory;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java
index aef780d..bebe5da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.state.memory;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFileCheckpointStorageAccessTestBase;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
index 9c57d0f..0331d5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.state.testutils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
index a4e1efb..80f36b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.testutils;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
index a71d9bc..fb4a8a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.ttl.mock;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
@@ -95,8 +96,7 @@ public class MockCheckpointStorage implements CheckpointStorage {
}
@Override
- public CheckpointStreamFactory.CheckpointStateOutputStream
- createTaskOwnedStateStream() {
+ public CheckpointStateOutputStream createTaskOwnedStateStream() {
return null;
}
};
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index 66e8b25..021e340 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java
index 00aac67..e6217c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.util;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.IOUtils;
@@ -35,8 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* until the stream is closed. This is typically used to test that a blocking read can be
* interrupted / closed.
*/
-public class BlockingCheckpointOutputStream
- extends CheckpointStreamFactory.CheckpointStateOutputStream {
+public class BlockingCheckpointOutputStream extends CheckpointStateOutputStream {
/** Optional delegate stream to which all ops are forwarded. */
private final FSDataOutputStream delegate;
@@ -192,10 +191,9 @@ public class BlockingCheckpointOutputStream
throw new IOException("Stream was already closed!");
}
- if (delegate instanceof CheckpointStreamFactory.CheckpointStateOutputStream) {
+ if (delegate instanceof CheckpointStateOutputStream) {
StreamStateHandle streamStateHandle =
- ((CheckpointStreamFactory.CheckpointStateOutputStream) delegate)
- .closeAndGetHandle();
+ ((CheckpointStateOutputStream) delegate).closeAndGetHandle();
unblockAll();
return streamStateHandle;
} else {
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index cf08092..9fc4f56 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -29,10 +29,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
@@ -45,6 +47,7 @@ import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInf
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestableKeyedStateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl;
@@ -217,7 +220,29 @@ public class ChangelogKeyedStateBackend<K>
this.stateChangelogWriter = stateChangelogWriter;
this.changelogStates = new HashMap<>();
this.changelogSnapshotState = completeRestore(initialState);
- this.streamFactory = shared -> checkpointStorageWorkerView.createTaskOwnedStateStream();
+ this.streamFactory =
+ new CheckpointStreamFactory() {
+
+ @Override
+ public CheckpointStateOutputStream createCheckpointStateOutputStream(
+ CheckpointedStateScope scope) throws IOException {
+ return checkpointStorageWorkerView.createTaskOwnedStateStream();
+ }
+
+ @Override
+ public boolean canFastDuplicate(
+ StreamStateHandle stateHandle, CheckpointedStateScope scope)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public List<StreamStateHandle> duplicate(
+ List<StreamStateHandle> stateHandles, CheckpointedStateScope scope)
+ throws IOException {
+ return null;
+ }
+ };
this.closer.register(keyedStateBackend);
}
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
index 4672c7c..bed4773 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -398,7 +400,13 @@ public class ChangelogStateBackendTestUtils {
}
@Override
- public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() {
+ public CheckpointStateOutputStream createTaskOwnedStateStream() {
+ throw new UnsupportedOperationException(
+ "Checkpoints are not supported in a single key state backend");
+ }
+
+ @Override
+ public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
throw new UnsupportedOperationException(
"Checkpoints are not supported in a single key state backend");
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
index 09b4ea0..68d1ed9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
@@ -19,6 +19,7 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StateHandleID;
@@ -115,7 +116,7 @@ public class RocksDBStateUploader extends RocksDBStateDataTransfer {
throws IOException {
InputStream inputStream = null;
- CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
+ CheckpointStateOutputStream outputStream = null;
try {
final byte[] buffer = new byte[READ_BUFFER_SIZE];
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 6ea1da5..f61394b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -40,8 +40,8 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
index 8ef843a..434976d 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StateHandleID;
@@ -42,6 +43,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -59,10 +61,30 @@ public class RocksDBStateUploaderTest extends TestLogger {
SpecifiedException expectedException =
new SpecifiedException("throw exception while multi thread upload states.");
- CheckpointStreamFactory.CheckpointStateOutputStream outputStream =
+ CheckpointStateOutputStream outputStream =
createFailingCheckpointStateOutputStream(expectedException);
CheckpointStreamFactory checkpointStreamFactory =
- (CheckpointedStateScope scope) -> outputStream;
+ new CheckpointStreamFactory() {
+ @Override
+ public CheckpointStateOutputStream createCheckpointStateOutputStream(
+ CheckpointedStateScope scope) throws IOException {
+ return outputStream;
+ }
+
+ @Override
+ public boolean canFastDuplicate(
+ StreamStateHandle stateHandle, CheckpointedStateScope scope)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public List<StreamStateHandle> duplicate(
+ List<StreamStateHandle> stateHandles, CheckpointedStateScope scope)
+ throws IOException {
+ return null;
+ }
+ };
File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
generateRandomFileContent(file.getPath(), 20);
@@ -119,9 +141,9 @@ public class RocksDBStateUploaderTest extends TestLogger {
}
}
- private CheckpointStreamFactory.CheckpointStateOutputStream
- createFailingCheckpointStateOutputStream(IOException failureException) {
- return new CheckpointStreamFactory.CheckpointStateOutputStream() {
+ private CheckpointStateOutputStream createFailingCheckpointStateOutputStream(
+ IOException failureException) {
+ return new CheckpointStateOutputStream() {
@Nullable
@Override
public StreamStateHandle closeAndGetHandle() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
index 0de86e1..3422850 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators.sorted.state;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -68,7 +69,7 @@ class NonCheckpointingStorageAccess implements CheckpointStorageAccess {
}
@Override
- public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() {
+ public CheckpointStateOutputStream createTaskOwnedStateStream() {
throw new UnsupportedOperationException(
"Checkpoints are not supported in a single key state backend");
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 4627c7d..a93c35d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -25,8 +25,8 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -69,7 +69,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
private final CheckpointCommitter committer;
protected final TypeSerializer<IN> serializer;
- private transient CheckpointStreamFactory.CheckpointStateOutputStream out;
+ private transient CheckpointStateOutputStream out;
private transient CheckpointStorageWorkerView checkpointStorage;
private transient ListState<PendingCheckpoint> checkpointedState;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index c8cc362..bcb4bdd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -713,8 +714,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
}
@Override
- public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream()
- throws IOException {
+ public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
return delegate.createTaskOwnedStateStream();
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 9a7f04b..7dd7f32 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -83,10 +84,8 @@ public class StateSnapshotContextSynchronousImplTest extends TestLogger {
long checkpointId = 42L;
long checkpointTimestamp = 1L;
- CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 =
- mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
- CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 =
- mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+ CheckpointStateOutputStream outputStream1 = mock(CheckpointStateOutputStream.class);
+ CheckpointStateOutputStream outputStream2 = mock(CheckpointStateOutputStream.class);
CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE))
@@ -129,10 +128,8 @@ public class StateSnapshotContextSynchronousImplTest extends TestLogger {
long checkpointId = 42L;
long checkpointTimestamp = 1L;
- CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 =
- mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
- CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 =
- mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+ CheckpointStateOutputStream outputStream1 = mock(CheckpointStateOutputStream.class);
+ CheckpointStateOutputStream outputStream2 = mock(CheckpointStateOutputStream.class);
CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE))
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 598bf52..517d632 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -55,8 +55,8 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.OperatorStateBackend;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
index 15252c7..ef7fc8a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
@@ -227,8 +228,7 @@ public class UnalignedCheckpointFailureHandlingITCase {
}
@Override
- public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream()
- throws IOException {
+ public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
return delegate.createTaskOwnedStateStream();
}