You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 15:25:04 UTC

[flink] branch master updated (6f64213 -> 939625f)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 6f64213  [FLINK-15629] Unify DefaultSchedulerBatchSchedulingTest with BatchSchedulingTestBase
     new 182e6af  [FLINK-5763][state backends] Do not create local stream for savepoint of heapstatebackend
     new c5583c7  [FLINK-5763][state backends] Make savepoint selfcontain and relocatable
     new fd0946a  [hotfix][state backends] Minor code-style cleanups
     new 5bd71c3  [FLINK-5763][state backends] (follow-up) Pull scope and relative/absolut path decision out of FsCheckpointStateOutputStream
     new 21f8234  [hotfix][tests] Make FsCheckpointStateOutputStreamTest work on Windows OS
     new d6bc0d5  [FLINK-5763][state backends] (follow-up) Rework MetaData Serializers and externalPointer passing
     new 4ee41be  [hotfix][tests] Add some clarifying comment in MetadataV3SerializerTest
     new 3ba9b38  [FLINK-17694][core] Fix argument check for SimpleVersionedSerialization
     new 1cceb3e  [FLINK-10740][DataStream API] Add a utility SimpleVersionedListState
     new 61d0344  [FLINK-10740][DataStream API] Simplify SourceOperator by using SimpleVersionedListState
     new 1e4880c  [hotfix][DataStream API] Minor formatting/warnings fixes in the SourceOperator and StreamOperator code.
     new 4c723c5  [hotfix][DataStream API] Fix style and warnings for CollectSinkOperatorFactory
     new 1c9ed3d  [FLINK-17696][streaming runtime] Add CoordinatorEventDispatcher to StreamOperatorParameters
     new 939625f  [FLINK-17699][DataStream API] Initalize SourceOperator more eagerly and reduce scope or collaborators.

The 14 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/core/fs/EntropyInjector.java  |   4 +
 .../core/io/SimpleVersionedSerialization.java      |   2 +-
 .../java/org/apache/flink/util/CollectionUtil.java |  17 ++
 .../apache/flink/core/fs/EntropyInjectorTest.java  |   7 +
 .../core/io/SimpleVersionedSerializationTest.java  |  49 ++--
 .../flink/state/api/runtime/SavepointLoader.java   |   2 +-
 .../flink/runtime/checkpoint/Checkpoints.java      |   8 +-
 .../metadata/ChannelStateHandleSerializer.java     |  22 +-
 .../checkpoint/metadata/MetadataSerializer.java    |   3 +-
 .../checkpoint/metadata/MetadataV1Serializer.java  |   2 +-
 .../checkpoint/metadata/MetadataV2Serializer.java  |  14 +-
 .../metadata/MetadataV2V3SerializerBase.java       | 268 ++++++++++++++-------
 .../checkpoint/metadata/MetadataV3Serializer.java  |  80 +++++-
 .../coordination/OperatorEventDispatcher.java      |  16 +-
 .../filesystem/AbstractFsCheckpointStorage.java    |   2 +-
 .../runtime/state/filesystem/FileStateHandle.java  |   4 +-
 .../state/filesystem/FsCheckpointStorage.java      |   2 +
 .../filesystem/FsCheckpointStreamFactory.java      |  37 ++-
 .../FsCompletedCheckpointStorageLocation.java      |   4 +
 .../state/filesystem/RelativeFileStateHandle.java  |  73 ++++++
 .../runtime/state/heap/HeapSnapshotStrategy.java   |   2 +-
 .../metadata/CheckpointMetadataTest.java           |   2 +-
 .../checkpoint/metadata/CheckpointTestUtils.java   |  51 ++--
 .../metadata/MetadataV3SerializerTest.java         |  71 +++++-
 .../IncrementalRemoteKeyedStateHandleTest.java     |   2 +-
 .../FsCheckpointStateOutputStreamTest.java         |  44 +++-
 .../filesystem/FsCheckpointStreamFactoryTest.java  | 113 +++++++++
 .../filesystem/FsStateBackendEntropyTest.java      |   2 +-
 .../api/operators/AbstractStreamOperator.java      |   6 +-
 .../api/operators/CoordinatedOperatorFactory.java  |  12 -
 .../streaming/api/operators/SourceOperator.java    | 122 ++++------
 .../api/operators/SourceOperatorFactory.java       |  65 +++--
 .../api/operators/StreamOperatorFactoryUtil.java   |  11 +-
 .../api/operators/StreamOperatorParameters.java    |  10 +-
 .../collect/CollectSinkOperatorFactory.java        |  26 +-
 .../operators/util/SimpleVersionedListState.java   | 142 +++++++++++
 .../runtime/tasks/OperatorEventDispatcherImpl.java |  14 +-
 .../api/operators/SourceOperatorTest.java          |  63 +++--
 .../StreamTaskStateInitializerImplTest.java        |   8 +-
 .../flink/streaming/util/OperatorSnapshotUtil.java |  22 +-
 .../flink/test/checkpointing/SavepointITCase.java  |  27 ++-
 41 files changed, 1044 insertions(+), 387 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/RelativeFileStateHandle.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/util/SimpleVersionedListState.java


[flink] 01/14: [FLINK-5763][state backends] Do not create local stream for savepoint of heapstatebackend

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 182e6affc631046cc2e4e6a6060a1fb8902e1601
Author: klion26 <qc...@gmail.com>
AuthorDate: Sat Apr 25 20:24:24 2020 +0800

    [FLINK-5763][state backends] Do not create local stream for savepoint of heapstatebackend
---
 .../java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
index 2ca9f5e..fb17757 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
@@ -140,7 +140,7 @@ class HeapSnapshotStrategy<K>
 
 		final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier =
 
-			localRecoveryConfig.isLocalRecoveryEnabled() ?
+			localRecoveryConfig.isLocalRecoveryEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ?
 
 				() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
 					checkpointId,


[flink] 07/14: [hotfix][tests] Add some clarifying comment in MetadataV3SerializerTest

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ee41bebc504880c7224ec29ffdd4cf28f5fc218
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sat May 16 01:57:46 2020 +0200

    [hotfix][tests] Add some clarifying comment in MetadataV3SerializerTest
---
 .../metadata/MetadataV3SerializerTest.java         | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java
index e5e9dbd..4a78321 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java
@@ -49,7 +49,7 @@ import static org.junit.Assert.assertEquals;
 public class MetadataV3SerializerTest {
 
 	@Rule
-	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	@Test
 	public void testCheckpointWithNoState() throws Exception {
@@ -163,13 +163,20 @@ public class MetadataV3SerializerTest {
 
 		CheckpointMetadata metadata = new CheckpointMetadata(checkpointId, operatorStates, masterStates);
 		MetadataV3Serializer.serialize(metadata, out);
-		Path metaPath = null;
-		// add this because we need to resolve the checkpoint pointer in MetadataV2V3SerializerBase.
+		out.close();
+
+		// The relative pointer resolution in MetadataV2V3SerializerBase currently runs the same
+		// code as the file system checkpoint location resolution. Because of that, it needs the
+		// a "_metadata" file present. we could change the code to resolve the pointer without doing
+		// file I/O, but it is somewhat delicate to reproduce that logic without I/O and the same guarantees
+		// to differentiate between the supported options of directory addressing and metadata file addressing.
+		// So, better safe than sorry, we do actually do the file system operations in the serializer for now,
+		// even if it makes the tests a a tad bit more clumsy
 		if (basePath != null) {
-			metaPath = new Path(basePath, "_metadata");
-			metaPath.getFileSystem().create(metaPath, FileSystem.WriteMode.OVERWRITE);
+			final Path metaPath = new Path(basePath, "_metadata");
+			// this is in the temp folder, so it will get automatically deleted
+			FileSystem.getLocalFileSystem().create(metaPath, FileSystem.WriteMode.OVERWRITE).close();
 		}
-		out.close();
 
 		byte[] bytes = baos.toByteArray();
 
@@ -183,8 +190,5 @@ public class MetadataV3SerializerTest {
 				a.hasNext();) {
 			CheckpointTestUtils.assertMasterStateEquality(a.next(), b.next());
 		}
-		if (metaPath != null) {
-			metaPath.getFileSystem().delete(metaPath, true);
-		}
 	}
 }


[flink] 11/14: [hotfix][DataStream API] Minor formatting/warnings fixes in the SourceOperator and StreamOperator code.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1e4880cdd27c26d2b925a3d108178844f7fa95ca
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 10 21:12:41 2020 +0200

    [hotfix][DataStream API] Minor formatting/warnings fixes in the SourceOperator and StreamOperator code.
---
 .../flink/streaming/api/operators/AbstractStreamOperator.java     | 6 ++----
 .../org/apache/flink/streaming/api/operators/SourceOperator.java  | 6 +++++-
 .../flink/streaming/api/operators/SourceOperatorFactory.java      | 5 +++++
 .../apache/flink/streaming/api/operators/SourceOperatorTest.java  | 8 ++++++--
 4 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 61925f4..8a4d215 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -156,7 +156,7 @@ public abstract class AbstractStreamOperator<OUT>
 		this.config = config;
 		try {
 			OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
-			this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
+			this.output = new CountingOutput<>(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
 			if (config.isChainStart()) {
 				operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
 			}
@@ -394,7 +394,6 @@ public abstract class AbstractStreamOperator<OUT>
 		return runtimeContext;
 	}
 
-	@SuppressWarnings("unchecked")
 	@VisibleForTesting
 	public <K> KeyedStateBackend<K> getKeyedStateBackend() {
 		return stateHandler.getKeyedStateBackend();
@@ -462,12 +461,10 @@ public abstract class AbstractStreamOperator<OUT>
 		}
 	}
 
-	@SuppressWarnings({"unchecked", "rawtypes"})
 	public void setCurrentKey(Object key) {
 		stateHandler.setCurrentKey(key);
 	}
 
-	@SuppressWarnings({"unchecked", "rawtypes"})
 	public Object getCurrentKey() {
 		return stateHandler.getCurrentKey();
 	}
@@ -551,6 +548,7 @@ public abstract class AbstractStreamOperator<OUT>
 		if (timeServiceManager == null) {
 			throw new RuntimeException("The timer service has not been initialized.");
 		}
+		@SuppressWarnings("unchecked")
 		InternalTimeServiceManager<K> keyedTimeServiceHandler = (InternalTimeServiceManager<K>) timeServiceManager;
 		return keyedTimeServiceHandler.getInternalTimerService(
 			name,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 4b17b0d..3af714d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -51,14 +51,18 @@ import java.util.concurrent.CompletableFuture;
  * the interface of {@link PushingAsyncDataInput} for naturally compatible with one input processing in runtime
  * stack.
  *
- * <p>Note: We are expecting this to be changed to the concrete class once SourceReader interface is introduced.
+ * <p><b>Important Note on Serialization:</b> The SourceOperator inherits the {@link java.io.Serializable}
+ * interface from the StreamOperator, but is in fact NOT serializable. The operator must only be instantiates
+ * in the StreamTask from its factory.
  *
  * @param <OUT> The output type of the operator.
  */
 @Internal
+@SuppressWarnings("serial")
 public class SourceOperator<OUT, SplitT extends SourceSplit>
 		extends AbstractStreamOperator<OUT>
 		implements OperatorEventHandler, PushingAsyncDataInput<OUT> {
+
 	// Package private for unit test.
 	static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC =
 			new ListStateDescriptor<>("SourceReaderState", BytePrimitiveArraySerializer.INSTANCE);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 4632b75..79f6453 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -30,10 +30,15 @@ import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
  */
 public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OUT>
 		implements CoordinatedOperatorFactory<OUT> {
+
+	private static final long serialVersionUID = 1L;
+
 	/** The {@link Source} to create the {@link SourceOperator}. */
 	private final Source<OUT, ?, ?> source;
+
 	/** The number of worker thread for the source coordinator. */
 	private final int numCoordinatorWorkerThread;
+
 	/** The {@link OperatorEventDispatcher} to register the SourceOperator. */
 	private OperatorEventDispatcher operatorEventDispatcher;
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 453b083..653af8b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -58,10 +58,13 @@ import static org.junit.Assert.assertTrue;
 /**
  * Unit test for {@link SourceOperator}.
  */
+@SuppressWarnings("serial")
 public class SourceOperatorTest {
+
 	private static final int NUM_SPLITS = 5;
 	private static final int SUBTASK_INDEX = 1;
 	private static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 10);
+
 	private MockSource source;
 	private MockOperatorEventGateway mockGateway;
 	private SourceOperator<Integer, MockSourceSplit> operator;
@@ -176,8 +179,9 @@ public class SourceOperatorTest {
 	/**
 	 * A testing class that overrides the getRuntimeContext() Method.
 	 */
-	private static class TestingSourceOperator<OUT, SplitT extends SourceSplit> extends
-																				SourceOperator<OUT, SplitT> {
+	private static class TestingSourceOperator<OUT, SplitT extends SourceSplit>
+			extends SourceOperator<OUT, SplitT> {
+
 		private final int subtaskIndex;
 
 		TestingSourceOperator(Source<OUT, SplitT, ?> source, int subtaskIndex) {


[flink] 12/14: [hotfix][DataStream API] Fix style and warnings for CollectSinkOperatorFactory

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4c723c59db2e8958dabecdf93aa34289feeacf0e
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sat May 16 14:39:07 2020 +0200

    [hotfix][DataStream API] Fix style and warnings for CollectSinkOperatorFactory
    
    In particular, avoid raw types and do not generate compiler warnings, as per the Flink
    codeing style guide.
---
 .../api/operators/collect/CollectSinkOperatorFactory.java      | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
index f43b9cf..7e9da6e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
@@ -32,23 +32,25 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 @SuppressWarnings("unchecked")
 public class CollectSinkOperatorFactory extends SimpleUdfStreamOperatorFactory<Object> implements CoordinatedOperatorFactory<Object> {
 
-	private final CollectSinkOperator operator;
+	private static final long serialVersionUID = 1L;
+
+	private final CollectSinkOperator<?> operator;
 
 	private OperatorEventDispatcher operatorEventDispatcher;
 
-	public CollectSinkOperatorFactory(CollectSinkOperator operator) {
+	public CollectSinkOperatorFactory(CollectSinkOperator<?> operator) {
 		super(operator);
 		this.operator = operator;
 	}
 
 	@Override
-	public StreamOperator createStreamOperator(StreamOperatorParameters parameters) {
+	public <T extends StreamOperator<Object>> T  createStreamOperator(StreamOperatorParameters<Object> parameters) {
 		OperatorEventGateway operatorEventGateway = operatorEventDispatcher.registerEventHandler(
 			parameters.getStreamConfig().getOperatorID(),
 			operator);
 		operator.setOperatorEventGateway(operatorEventGateway);
 		operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
-		return operator;
+		return (T) operator;
 	}
 
 	@Override


[flink] 06/14: [FLINK-5763][state backends] (follow-up) Rework MetaData Serializers and externalPointer passing

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d6bc0d5b845f7ade465e69c30e07be2947bb4b5e
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sat May 16 01:16:50 2020 +0200

    [FLINK-5763][state backends] (follow-up) Rework MetaData Serializers and externalPointer passing
    
    This fix has several goals:
    
    (1) Change the pointer / path resolution parsing from a static variable to a parameter that is passed.
    The serializers are singleton instances and currently assume to be usable in a multi-threaded manner.
    The static variables prevent this, usng the context object parameter restores this behavior.
    
    (2) Lower level serialization methods should not expose themselves directly to the tests.
    This methods makes (almost) all lower level serialization methods instance methods and package-private.
    Static access methods are gathered in one place, as a workaround for tests that require accesd to
    those lower level methods (even through they should not).
    
    (3) With more unified access to the methods from tests, we can now make prevent that tests need to
    be aware of the the context object or external pointer parameter.
    
    (4) Minor cosmetic cleanups around method grouping.
---
 .../org/apache/flink/core/fs/EntropyInjector.java  |   2 +-
 .../metadata/ChannelStateHandleSerializer.java     |  22 +-
 .../checkpoint/metadata/MetadataV2Serializer.java  |  10 +-
 .../metadata/MetadataV2V3SerializerBase.java       | 266 +++++++++++++--------
 .../checkpoint/metadata/MetadataV3Serializer.java  |  76 +++++-
 .../filesystem/AbstractFsCheckpointStorage.java    |   2 +-
 .../flink/streaming/util/OperatorSnapshotUtil.java |  24 +-
 7 files changed, 267 insertions(+), 135 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
index 1f67b34..e4e2078 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
@@ -94,7 +94,7 @@ public class EntropyInjector {
 	}
 
 	@Nullable
-	public static EntropyInjectingFileSystem getEntropyFs(FileSystem fs) {
+	private static EntropyInjectingFileSystem getEntropyFs(FileSystem fs) {
 		if (fs instanceof EntropyInjectingFileSystem) {
 			return (EntropyInjectingFileSystem) fs;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/ChannelStateHandleSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/ChannelStateHandleSerializer.java
index f7b2c8f..30a4a30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/ChannelStateHandleSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/ChannelStateHandleSerializer.java
@@ -45,11 +45,15 @@ class ChannelStateHandleSerializer {
 		});
 	}
 
-	ResultSubpartitionStateHandle deserializeResultSubpartitionStateHandle(DataInputStream dis) throws IOException {
+	ResultSubpartitionStateHandle deserializeResultSubpartitionStateHandle(
+			DataInputStream dis,
+			MetadataV2V3SerializerBase.DeserializationContext context) throws IOException {
+
 		return deserializeChannelStateHandle(
 			is -> new ResultSubpartitionInfo(is.readInt(), is.readInt()),
 			(streamStateHandle, longs, info) -> new ResultSubpartitionStateHandle(info, streamStateHandle, longs),
-			dis);
+			dis,
+			context);
 	}
 
 	public void serialize(InputChannelStateHandle handle, DataOutputStream dos) throws IOException {
@@ -59,11 +63,15 @@ class ChannelStateHandleSerializer {
 		});
 	}
 
-	InputChannelStateHandle deserializeInputChannelStateHandle(DataInputStream dis) throws IOException {
+	InputChannelStateHandle deserializeInputChannelStateHandle(
+			DataInputStream dis,
+			MetadataV2V3SerializerBase.DeserializationContext context) throws IOException {
+
 		return deserializeChannelStateHandle(
 			is -> new InputChannelInfo(is.readInt(), is.readInt()),
 			(streamStateHandle, longs, inputChannelInfo) -> new InputChannelStateHandle(inputChannelInfo, streamStateHandle, longs),
-			dis);
+			dis,
+			context);
 	}
 
 	private static <I> void serializeChannelStateHandle(
@@ -81,13 +89,15 @@ class ChannelStateHandleSerializer {
 	private static <Info, Handle extends AbstractChannelStateHandle<Info>> Handle deserializeChannelStateHandle(
 			FunctionWithException<DataInputStream, Info, IOException> infoReader,
 			TriFunctionWithException<StreamStateHandle, List<Long>, Info, Handle, IOException> handleBuilder,
-			DataInputStream dis) throws IOException {
+			DataInputStream dis,
+			MetadataV2V3SerializerBase.DeserializationContext context) throws IOException {
+
 		final Info info = infoReader.apply(dis);
 		int offsetsSize = dis.readInt();
 		final List<Long> offsets = new ArrayList<>(offsetsSize);
 		for (int i = 0; i < offsetsSize; i++) {
 			offsets.add(dis.readLong());
 		}
-		return handleBuilder.apply(deserializeStreamStateHandle(dis, null), offsets, info);
+		return handleBuilder.apply(deserializeStreamStateHandle(dis, context), offsets, info);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
index e02505a..fdf4392 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 
+import javax.annotation.Nullable;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -89,7 +91,7 @@ public class MetadataV2Serializer extends MetadataV2V3SerializerBase implements
 	}
 
 	@Override
-	protected OperatorState deserializeOperatorState(DataInputStream dis, String externalPointer) throws IOException {
+	protected OperatorState deserializeOperatorState(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
 		final OperatorID jobVertexId = new OperatorID(dis.readLong(), dis.readLong());
 		final int parallelism = dis.readInt();
 		final int maxParallelism = dis.readInt();
@@ -106,7 +108,7 @@ public class MetadataV2Serializer extends MetadataV2V3SerializerBase implements
 
 		for (int j = 0; j < numSubTaskStates; j++) {
 			final int subtaskIndex = dis.readInt();
-			final OperatorSubtaskState subtaskState = deserializeSubtaskState(dis, externalPointer);
+			final OperatorSubtaskState subtaskState = deserializeSubtaskState(dis, context);
 			taskState.putState(subtaskIndex, subtaskState);
 		}
 
@@ -125,7 +127,7 @@ public class MetadataV2Serializer extends MetadataV2V3SerializerBase implements
 	}
 
 	@Override
-	protected OperatorSubtaskState deserializeSubtaskState(DataInputStream dis, String externalPointer) throws IOException {
+	protected OperatorSubtaskState deserializeSubtaskState(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
 		// read two unused fields for compatibility:
 		//   - "duration"
 		//   - number of legacy states
@@ -138,6 +140,6 @@ public class MetadataV2Serializer extends MetadataV2V3SerializerBase implements
 					"no longer supported.");
 		}
 
-		return super.deserializeSubtaskState(dis, externalPointer);
+		return super.deserializeSubtaskState(dis, context);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index 2ea18be..fe8343a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint.metadata;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
@@ -39,11 +38,10 @@ import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.function.BiConsumerWithException;
-import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.BiFunctionWithException;
 
 import javax.annotation.Nullable;
 
@@ -61,8 +59,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /**
  * Base (De)serializer for checkpoint metadata format version 2 and 3.
  *
@@ -95,9 +91,6 @@ public abstract class MetadataV2V3SerializerBase {
 	private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
 	private static final byte RELATIVE_STREAM_STATE_HANDLE = 6;
 
-	private static Path exclusiveCheckpointDir = null;
-	private static String previousExternalPointer = null;
-
 	// ------------------------------------------------------------------------
 	//  (De)serialization entry points
 	// ------------------------------------------------------------------------
@@ -122,7 +115,13 @@ public abstract class MetadataV2V3SerializerBase {
 		}
 	}
 
-	protected CheckpointMetadata deserializeMetadata(DataInputStream dis, String externalPointer) throws IOException {
+	protected CheckpointMetadata deserializeMetadata(
+			DataInputStream dis,
+			@Nullable String externalPointer) throws IOException {
+
+		final DeserializationContext context = externalPointer == null
+				? null : new DeserializationContext(externalPointer);
+
 		// first: checkpoint ID
 		final long checkpointId = dis.readLong();
 		if (checkpointId < 0) {
@@ -151,7 +150,7 @@ public abstract class MetadataV2V3SerializerBase {
 		final List<OperatorState> operatorStates = new ArrayList<>(numTaskStates);
 
 		for (int i = 0; i < numTaskStates; i++) {
-			operatorStates.add(deserializeOperatorState(dis, externalPointer));
+			operatorStates.add(deserializeOperatorState(dis, context));
 		}
 
 		return new CheckpointMetadata(checkpointId, operatorStates, masterStates);
@@ -220,15 +219,11 @@ public abstract class MetadataV2V3SerializerBase {
 
 	protected abstract void serializeOperatorState(OperatorState operatorState, DataOutputStream dos) throws IOException;
 
-	protected abstract OperatorState deserializeOperatorState(DataInputStream dis, String deserializeOperatorState) throws IOException;
-
-	// ------------------------------------------------------------------------
-	//  operator subtask state (de)serialization methods
-	// ------------------------------------------------------------------------
+	protected abstract OperatorState deserializeOperatorState(DataInputStream dis, @Nullable DeserializationContext context) throws IOException;
 
 	protected void serializeSubtaskState(OperatorSubtaskState subtaskState, DataOutputStream dos) throws IOException {
-		serializeSingleton(subtaskState.getManagedOperatorState(), dos, MetadataV2V3SerializerBase::serializeOperatorStateHandle);
-		serializeSingleton(subtaskState.getRawOperatorState(), dos, MetadataV2V3SerializerBase::serializeOperatorStateHandle);
+		serializeSingleton(subtaskState.getManagedOperatorState(), dos, this::serializeOperatorStateHandle);
+		serializeSingleton(subtaskState.getRawOperatorState(), dos, this::serializeOperatorStateHandle);
 		serializeKeyedStateCol(subtaskState.getManagedKeyedState(), dos);
 		serializeKeyedStateCol(subtaskState.getRawKeyedState(), dos);
 	}
@@ -237,32 +232,22 @@ public abstract class MetadataV2V3SerializerBase {
 		serializeKeyedStateHandle(extractSingleton(managedKeyedState), dos);
 	}
 
-	private <T extends StateObject> void serializeSingleton(
-			StateObjectCollection<T> stateObjectCollection,
-			DataOutputStream dos,
-			BiConsumerWithException<T, DataOutputStream, IOException> cons) throws IOException {
-		final T state = extractSingleton(stateObjectCollection);
-		if (state != null) {
-			dos.writeInt(1);
-			cons.accept(state, dos);
-		} else {
-			dos.writeInt(0);
-		}
-	}
+	protected OperatorSubtaskState deserializeSubtaskState(
+			DataInputStream dis,
+			@Nullable DeserializationContext context) throws IOException {
 
-	protected OperatorSubtaskState deserializeSubtaskState(DataInputStream dis, String externalPointer) throws IOException {
 		final boolean hasManagedOperatorState = dis.readInt() != 0;
-		final OperatorStateHandle managedOperatorState = hasManagedOperatorState ? deserializeOperatorStateHandle(dis, externalPointer) : null;
+		final OperatorStateHandle managedOperatorState = hasManagedOperatorState ? deserializeOperatorStateHandle(dis, context) : null;
 
 		final boolean hasRawOperatorState = dis.readInt() != 0;
-		final OperatorStateHandle rawOperatorState = hasRawOperatorState ? deserializeOperatorStateHandle(dis, externalPointer) : null;
+		final OperatorStateHandle rawOperatorState = hasRawOperatorState ? deserializeOperatorStateHandle(dis, context) : null;
 
-		final KeyedStateHandle managedKeyedState = deserializeKeyedStateHandle(dis, externalPointer);
-		final KeyedStateHandle rawKeyedState = deserializeKeyedStateHandle(dis, externalPointer);
+		final KeyedStateHandle managedKeyedState = deserializeKeyedStateHandle(dis, context);
+		final KeyedStateHandle rawKeyedState = deserializeKeyedStateHandle(dis, context);
 
-		StateObjectCollection<InputChannelStateHandle> inputChannelState = deserializeInputChannelStateHandle(dis);
+		StateObjectCollection<InputChannelStateHandle> inputChannelState = deserializeInputChannelStateHandle(dis, context);
 
-		StateObjectCollection<ResultSubpartitionStateHandle> resultSubpartitionState = deserializeResultSubpartitionStateHandle(dis);
+		StateObjectCollection<ResultSubpartitionStateHandle> resultSubpartitionState = deserializeResultSubpartitionStateHandle(dis, context);
 
 		return new OperatorSubtaskState(
 			managedOperatorState,
@@ -273,10 +258,11 @@ public abstract class MetadataV2V3SerializerBase {
 			resultSubpartitionState);
 	}
 
-	@VisibleForTesting
-	public static void serializeKeyedStateHandle(
-			KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
+	// ------------------------------------------------------------------------
+	//  keyed state
+	// ------------------------------------------------------------------------
 
+	void serializeKeyedStateHandle(KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
 		if (stateHandle == null) {
 			dos.writeByte(NULL_HANDLE);
 		} else if (stateHandle instanceof KeyGroupsStateHandle) {
@@ -309,35 +295,10 @@ public abstract class MetadataV2V3SerializerBase {
 		}
 	}
 
-	private static void serializeStreamStateHandleMap(
-			Map<StateHandleID, StreamStateHandle> map,
-			DataOutputStream dos) throws IOException {
-
-		dos.writeInt(map.size());
-		for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) {
-			dos.writeUTF(entry.getKey().toString());
-			serializeStreamStateHandle(entry.getValue(), dos);
-		}
-	}
-
-	private static Map<StateHandleID, StreamStateHandle> deserializeStreamStateHandleMap(
+	KeyedStateHandle deserializeKeyedStateHandle(
 			DataInputStream dis,
-			String externalPointer) throws IOException {
-
-		final int size = dis.readInt();
-		Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
+			@Nullable DeserializationContext context) throws IOException {
 
-		for (int i = 0; i < size; ++i) {
-			StateHandleID stateHandleID = new StateHandleID(dis.readUTF());
-			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, externalPointer);
-			result.put(stateHandleID, stateHandle);
-		}
-
-		return result;
-	}
-
-	@VisibleForTesting
-	public static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis, String externalPointer) throws IOException {
 		final int type = dis.readByte();
 		if (NULL_HANDLE == type) {
 
@@ -354,7 +315,7 @@ public abstract class MetadataV2V3SerializerBase {
 			}
 			KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(
 				keyGroupRange, offsets);
-			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, externalPointer);
+			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context);
 			return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
 		} else if (INCREMENTAL_KEY_GROUPS_HANDLE == type) {
 
@@ -365,9 +326,9 @@ public abstract class MetadataV2V3SerializerBase {
 			KeyGroupRange keyGroupRange =
 				KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
 
-			StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis, externalPointer);
-			Map<StateHandleID, StreamStateHandle> sharedStates = deserializeStreamStateHandleMap(dis, externalPointer);
-			Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis, externalPointer);
+			StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis, context);
+			Map<StateHandleID, StreamStateHandle> sharedStates = deserializeStreamStateHandleMap(dis, context);
+			Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis, context);
 
 			UUID uuid;
 
@@ -390,10 +351,7 @@ public abstract class MetadataV2V3SerializerBase {
 		}
 	}
 
-	@VisibleForTesting
-	public static void serializeOperatorStateHandle(
-		OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
-
+	void serializeOperatorStateHandle(OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
 		if (stateHandle != null) {
 			dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
 			Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap =
@@ -419,10 +377,9 @@ public abstract class MetadataV2V3SerializerBase {
 		}
 	}
 
-	@VisibleForTesting
-	public static OperatorStateHandle deserializeOperatorStateHandle(
+	OperatorStateHandle deserializeOperatorStateHandle(
 			DataInputStream dis,
-			String externalPointer) throws IOException {
+			@Nullable DeserializationContext context) throws IOException {
 
 		final int type = dis.readByte();
 		if (NULL_HANDLE == type) {
@@ -445,17 +402,44 @@ public abstract class MetadataV2V3SerializerBase {
 						new OperatorStateHandle.StateMetaInfo(offsets, mode);
 				offsetsMap.put(key, metaInfo);
 			}
-			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, externalPointer);
+			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context);
 			return new OperatorStreamStateHandle(offsetsMap, stateHandle);
 		} else {
 			throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
 		}
 	}
 
-	@VisibleForTesting
-	public static void serializeStreamStateHandle(
-			StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
+	// ------------------------------------------------------------------------
+	//  channel state (unaligned checkpoints)
+	// ------------------------------------------------------------------------
+
+	protected StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(
+			DataInputStream dis,
+			@Nullable DeserializationContext context) throws IOException {
+		return StateObjectCollection.empty();
+	}
 
+	protected StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(
+			DataInputStream dis,
+			@Nullable DeserializationContext context) throws IOException {
+		return StateObjectCollection.empty();
+	}
+
+	protected void serializeResultSubpartitionStateHandle(
+			ResultSubpartitionStateHandle resultSubpartitionStateHandle,
+			DataOutputStream dos) throws IOException {
+	}
+
+	protected void serializeInputChannelStateHandle(
+			InputChannelStateHandle inputChannelStateHandle,
+			DataOutputStream dos) throws IOException {
+	}
+
+	// ------------------------------------------------------------------------
+	//  low-level state handles
+	// ------------------------------------------------------------------------
+
+	static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
 		if (stateHandle == null) {
 			dos.writeByte(NULL_HANDLE);
 
@@ -484,7 +468,10 @@ public abstract class MetadataV2V3SerializerBase {
 		dos.flush();
 	}
 
-	public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis, @Nullable String externalPointer) throws IOException {
+	static StreamStateHandle deserializeStreamStateHandle(
+			DataInputStream dis,
+			@Nullable DeserializationContext context) throws IOException {
+
 		final int type = dis.read();
 		if (NULL_HANDLE == type) {
 			return null;
@@ -499,14 +486,12 @@ public abstract class MetadataV2V3SerializerBase {
 			dis.readFully(data);
 			return new ByteStreamStateHandle(handleName, data);
 		} else if (RELATIVE_STREAM_STATE_HANDLE == type) {
-			checkArgument(externalPointer != null, "external pointer should not be null when deserializing relative state handle.");
+			if (context == null) {
+				throw new IOException("Cannot deserialize a RelativeFileStateHandle without a context to make it relative to.");
+			}
 			String relativePath = dis.readUTF();
 			long size = dis.readLong();
-			if (exclusiveCheckpointDir == null || (!externalPointer.equals(previousExternalPointer))) {
-				exclusiveCheckpointDir = ((FsCompletedCheckpointStorageLocation) (AbstractFsCheckpointStorage.resolveCheckpointPointer(externalPointer))).getExclusiveCheckpointDir();
-				previousExternalPointer = externalPointer;
-			}
-			Path statePath = new Path(exclusiveCheckpointDir, relativePath);
+			Path statePath = new Path(context.getExclusiveDirPath(), relativePath);
 			return new RelativeFileStateHandle(statePath, relativePath, size);
 		} else {
 			throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
@@ -518,7 +503,7 @@ public abstract class MetadataV2V3SerializerBase {
 	// ------------------------------------------------------------------------
 
 	@Nullable
-	static <T> T extractSingleton(Collection<T> collection) {
+	private static <T> T extractSingleton(Collection<T> collection) {
 		if (collection == null || collection.isEmpty()) {
 			return null;
 		}
@@ -530,27 +515,104 @@ public abstract class MetadataV2V3SerializerBase {
 		}
 	}
 
-	protected StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(DataInputStream dis) throws IOException {
-		return StateObjectCollection.empty();
-	}
-
-	protected StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(DataInputStream dis) throws IOException {
-		return StateObjectCollection.empty();
-	}
-
-	protected void serializeResultSubpartitionStateHandle(ResultSubpartitionStateHandle resultSubpartitionStateHandle, DataOutputStream dos) throws IOException {
+	private static <T extends StateObject> void serializeSingleton(
+			StateObjectCollection<T> stateObjectCollection,
+			DataOutputStream dos,
+			BiConsumerWithException<T, DataOutputStream, IOException> cons) throws IOException {
+		final T state = extractSingleton(stateObjectCollection);
+		if (state != null) {
+			dos.writeInt(1);
+			cons.accept(state, dos);
+		} else {
+			dos.writeInt(0);
+		}
 	}
 
-	protected void serializeInputChannelStateHandle(InputChannelStateHandle inputChannelStateHandle, DataOutputStream dos) throws IOException {
-	}
+	static <T extends StateObject> StateObjectCollection<T> deserializeCollection(
+		DataInputStream dis,
+		DeserializationContext context,
+		BiFunctionWithException<DataInputStream, DeserializationContext, T, IOException> s) throws IOException {
 
-	static <T extends StateObject> StateObjectCollection<T> deserializeCollection(DataInputStream dis, FunctionWithException<DataInputStream, T, IOException> s) throws IOException {
 		int size = dis.readInt();
 		List<T> result = new ArrayList<>();
 		for (int i = 0; i < size; i++) {
-			result.add(s.apply(dis));
+			result.add(s.apply(dis, context));
 		}
 		return new StateObjectCollection<>(result);
 	}
 
+	private static void serializeStreamStateHandleMap(
+		Map<StateHandleID, StreamStateHandle> map,
+		DataOutputStream dos) throws IOException {
+
+		dos.writeInt(map.size());
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) {
+			dos.writeUTF(entry.getKey().toString());
+			serializeStreamStateHandle(entry.getValue(), dos);
+		}
+	}
+
+	private static Map<StateHandleID, StreamStateHandle> deserializeStreamStateHandleMap(
+		DataInputStream dis,
+		@Nullable DeserializationContext context) throws IOException {
+
+		final int size = dis.readInt();
+		Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
+
+		for (int i = 0; i < size; ++i) {
+			StateHandleID stateHandleID = new StateHandleID(dis.readUTF());
+			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context);
+			result.put(stateHandleID, stateHandle);
+		}
+
+		return result;
+	}
+
+	// ------------------------------------------------------------------------
+	//  internal helper classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A context that keeps information needed during serialization. This context is passed
+	 * along by the methods. In some sense, this replaces the member fields of the class, because
+	 * the serializer is supposed to be "singleton stateless", and because there are multiple instances
+	 * involved (metadata serializer, channel state serializer).
+	 *
+	 * <p>The alternative to passing this context along would be to change the serializers to
+	 * work as actual instances so that they can keep the state. We might still want to do that,
+	 * but at the time of implementing this, it seems the less invasive change to use this context,
+	 * and it also works with static methods and with different serializers instances that do not know
+	 * of each other.
+	 *
+	 * <p>This context is currently hardwired to the FileSystem-based State Backends.
+	 * At the moment, this works because those are the only ones producing relative file
+	 * paths handles, which are in turn the only ones needing this context.
+	 * In the future, we should refactor this, though, and make the DeserializationContext
+	 * a property of the used checkpoint storage. That makes
+	 */
+	protected static final class DeserializationContext {
+
+		private final String externalPointer;
+
+		private Path cachedExclusiveDirPath;
+
+		DeserializationContext(String externalPointer) {
+			this.externalPointer = externalPointer;
+		}
+
+		Path getExclusiveDirPath() throws IOException {
+			if (cachedExclusiveDirPath == null) {
+				cachedExclusiveDirPath = createExclusiveDirPath(externalPointer);
+			}
+			return cachedExclusiveDirPath;
+		}
+
+		private static Path createExclusiveDirPath(String externalPointer) throws IOException {
+			try {
+				return AbstractFsCheckpointStorage.resolveCheckpointPointer(externalPointer).getExclusiveCheckpointDir();
+			} catch (IOException e) {
+				throw new IOException("Could not parse external pointer as state base path", e);
+			}
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
index 3376ce7..6bf98c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
@@ -25,10 +25,15 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.function.BiConsumerWithException;
 
+import javax.annotation.Nullable;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -109,7 +114,7 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 	}
 
 	@Override
-	protected OperatorState deserializeOperatorState(DataInputStream dis, String externalPointer) throws IOException {
+	protected OperatorState deserializeOperatorState(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
 		final OperatorID jobVertexId = new OperatorID(dis.readLong(), dis.readLong());
 		final int parallelism = dis.readInt();
 		final int maxParallelism = dis.readInt();
@@ -117,14 +122,14 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 		final OperatorState operatorState = new OperatorState(jobVertexId, parallelism, maxParallelism);
 
 		// Coordinator state
-		operatorState.setCoordinatorState(deserializeStreamStateHandle(dis, externalPointer));
+		operatorState.setCoordinatorState(deserializeStreamStateHandle(dis, context));
 
 		// Sub task states
 		final int numSubTaskStates = dis.readInt();
 
 		for (int j = 0; j < numSubTaskStates; j++) {
 			final int subtaskIndex = dis.readInt();
-			final OperatorSubtaskState subtaskState = deserializeSubtaskState(dis, externalPointer);
+			final OperatorSubtaskState subtaskState = deserializeSubtaskState(dis, context);
 			operatorState.putState(subtaskIndex, subtaskState);
 		}
 
@@ -139,8 +144,10 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 
 	@VisibleForTesting
 	@Override
-	public StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(DataInputStream dis) throws IOException {
-		return deserializeCollection(dis, channelStateHandleSerializer::deserializeResultSubpartitionStateHandle);
+	public StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(
+			DataInputStream dis,
+			@Nullable DeserializationContext context) throws IOException {
+		return deserializeCollection(dis, context, channelStateHandleSerializer::deserializeResultSubpartitionStateHandle);
 	}
 
 	@VisibleForTesting
@@ -151,8 +158,10 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 
 	@VisibleForTesting
 	@Override
-	public StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(DataInputStream dis) throws IOException {
-		return deserializeCollection(dis, channelStateHandleSerializer::deserializeInputChannelStateHandle);
+	public StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(
+			DataInputStream dis,
+			@Nullable DeserializationContext context) throws IOException {
+		return deserializeCollection(dis, context, channelStateHandleSerializer::deserializeInputChannelStateHandle);
 	}
 
 	private <T extends StateObject> void serializeCollection(
@@ -169,4 +178,57 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  exposed static methods for test cases
+	//
+	//  NOTE: The fact that certain tests directly call these lower level
+	//        serialization methods is a problem, because that way the tests
+	//        bypass the versioning scheme. Especially tests that test for
+	//        cross-version compatibility need to version themselves if we
+	//        ever break the format of these low level state types.
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	public static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
+		MetadataV2V3SerializerBase.serializeStreamStateHandle(stateHandle, dos);
+	}
+
+	@VisibleForTesting
+	public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+		return MetadataV2V3SerializerBase.deserializeStreamStateHandle(dis, null);
+	}
+
+	@VisibleForTesting
+	public static void serializeOperatorStateHandleUtil(OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
+		INSTANCE.serializeOperatorStateHandle(stateHandle, dos);
+	}
+
+	@VisibleForTesting
+	public static OperatorStateHandle deserializeOperatorStateHandleUtil(DataInputStream dis) throws IOException {
+		return INSTANCE.deserializeOperatorStateHandle(dis, null);
+	}
+
+	@VisibleForTesting
+	public static void serializeKeyedStateHandleUtil(
+			KeyedStateHandle stateHandle,
+			DataOutputStream dos) throws IOException {
+		INSTANCE.serializeKeyedStateHandle(stateHandle, dos);
+	}
+
+	@VisibleForTesting
+	public static KeyedStateHandle deserializeKeyedStateHandleUtil(DataInputStream dis) throws IOException {
+		return INSTANCE.deserializeKeyedStateHandle(dis, null);
+	}
+
+	@VisibleForTesting
+	public static StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(
+			DataInputStream dis) throws IOException {
+		return INSTANCE.deserializeInputChannelStateHandle(dis, null);
+	}
+
+	@VisibleForTesting
+	public StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(
+			DataInputStream dis) throws IOException {
+		return INSTANCE.deserializeResultSubpartitionStateHandle(dis, null);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
index d7cf150..0bc0650 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
@@ -210,7 +210,7 @@ public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {
 	 *                     the pointer points to a location that does not seem to be a checkpoint/savepoint.
 	 */
 	@Internal
-	public static CompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
+	public static FsCompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
 		checkNotNull(checkpointPointer, "checkpointPointer");
 		checkArgument(!checkpointPointer.isEmpty(), "empty checkpoint pointer");
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 2b812ea..b7c1188 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -63,7 +63,7 @@ public class OperatorSnapshotUtil {
 			if (rawOperatorState != null) {
 				dos.writeInt(rawOperatorState.size());
 				for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
-					MetadataV3Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+					MetadataV3Serializer.serializeOperatorStateHandleUtil(operatorStateHandle, dos);
 				}
 			} else {
 				// this means no states, not even an empty list
@@ -74,7 +74,7 @@ public class OperatorSnapshotUtil {
 			if (managedOperatorState != null) {
 				dos.writeInt(managedOperatorState.size());
 				for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
-					MetadataV3Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+					MetadataV3Serializer.serializeOperatorStateHandleUtil(operatorStateHandle, dos);
 				}
 			} else {
 				// this means no states, not even an empty list
@@ -85,7 +85,7 @@ public class OperatorSnapshotUtil {
 			if (rawKeyedState != null) {
 				dos.writeInt(rawKeyedState.size());
 				for (KeyedStateHandle keyedStateHandle : rawKeyedState) {
-					MetadataV3Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+					MetadataV3Serializer.serializeKeyedStateHandleUtil(keyedStateHandle, dos);
 				}
 			} else {
 				// this means no operator states, not even an empty list
@@ -96,7 +96,7 @@ public class OperatorSnapshotUtil {
 			if (managedKeyedState != null) {
 				dos.writeInt(managedKeyedState.size());
 				for (KeyedStateHandle keyedStateHandle : managedKeyedState) {
-					MetadataV3Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+					MetadataV3Serializer.serializeKeyedStateHandleUtil(keyedStateHandle, dos);
 				}
 			} else {
 				// this means no operator states, not even an empty list
@@ -127,15 +127,14 @@ public class OperatorSnapshotUtil {
 			final int v = dis.readInt();
 
 			// still required for compatibility to consume the bytes.
-			MetadataV3Serializer.deserializeStreamStateHandle(dis, null);
+			MetadataV3Serializer.deserializeStreamStateHandle(dis);
 
 			List<OperatorStateHandle> rawOperatorState = null;
 			int numRawOperatorStates = dis.readInt();
 			if (numRawOperatorStates >= 0) {
 				rawOperatorState = new ArrayList<>();
 				for (int i = 0; i < numRawOperatorStates; i++) {
-					OperatorStateHandle operatorState = MetadataV3Serializer.deserializeOperatorStateHandle(
-						dis, null);
+					OperatorStateHandle operatorState = MetadataV3Serializer.deserializeOperatorStateHandleUtil(dis);
 					rawOperatorState.add(operatorState);
 				}
 			}
@@ -145,8 +144,7 @@ public class OperatorSnapshotUtil {
 			if (numManagedOperatorStates >= 0) {
 				managedOperatorState = new ArrayList<>();
 				for (int i = 0; i < numManagedOperatorStates; i++) {
-					OperatorStateHandle operatorState = MetadataV3Serializer.deserializeOperatorStateHandle(
-						dis, null);
+					OperatorStateHandle operatorState = MetadataV3Serializer.deserializeOperatorStateHandleUtil(dis);
 					managedOperatorState.add(operatorState);
 				}
 			}
@@ -156,8 +154,7 @@ public class OperatorSnapshotUtil {
 			if (numRawKeyedStates >= 0) {
 				rawKeyedState = new ArrayList<>();
 				for (int i = 0; i < numRawKeyedStates; i++) {
-					KeyedStateHandle keyedState = MetadataV3Serializer.deserializeKeyedStateHandle(
-						dis, null);
+					KeyedStateHandle keyedState = MetadataV3Serializer.deserializeKeyedStateHandleUtil(dis);
 					rawKeyedState.add(keyedState);
 				}
 			}
@@ -167,15 +164,14 @@ public class OperatorSnapshotUtil {
 			if (numManagedKeyedStates >= 0) {
 				managedKeyedState = new ArrayList<>();
 				for (int i = 0; i < numManagedKeyedStates; i++) {
-					KeyedStateHandle keyedState = MetadataV3Serializer.deserializeKeyedStateHandle(
-						dis, null);
+					KeyedStateHandle keyedState = MetadataV3Serializer.deserializeKeyedStateHandleUtil(dis);
 					managedKeyedState.add(keyedState);
 				}
 			}
 
 			final StateObjectCollection<InputChannelStateHandle> inputChannelStateHandles =
 				v == MetadataV3Serializer.VERSION ?
-					MetadataV3Serializer.INSTANCE.deserializeInputChannelStateHandle(dis) :
+					MetadataV3Serializer.deserializeInputChannelStateHandle(dis) :
 					StateObjectCollection.empty();
 
 			final StateObjectCollection<ResultSubpartitionStateHandle> resultSubpartitionStateHandles =


[flink] 03/14: [hotfix][state backends] Minor code-style cleanups

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd0946af1057265ef6cbf7a4e8c1fcc04a935589
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 15 13:13:26 2020 +0200

    [hotfix][state backends] Minor code-style cleanups
---
 .../runtime/state/filesystem/FileStateHandle.java  |  4 ++--
 .../state/filesystem/RelativeFileStateHandle.java  | 27 ++++++++++------------
 2 files changed, 14 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 3d94709..18eb479 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -37,10 +37,10 @@ public class FileStateHandle implements StreamStateHandle {
 
 	private static final long serialVersionUID = 350284443258002355L;
 
-	/** The path to the file in the filesystem, fully describing the file system */
+	/** The path to the file in the filesystem, fully describing the file system. */
 	private final Path filePath;
 
-	/** The size of the state in the file */
+	/** The size of the state in the file. */
 	private final long stateSize;
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/RelativeFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/RelativeFileStateHandle.java
index 96b79ef..2ac4969 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/RelativeFileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/RelativeFileStateHandle.java
@@ -23,9 +23,9 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 
 
 /**
- * {@link StreamStateHandle} for state that was written to a file stream.
- * The differences between {@link FileStateHandle} and {@link RelativeFileStateHandle} is that, {@link RelativeFileStateHandle}
- * contains relativePath for the given handle.
+ * A {@link StreamStateHandle} for state that was written to a file stream.
+ * The differences between {@link FileStateHandle} and {@link RelativeFileStateHandle} is that
+ * {@link RelativeFileStateHandle} contains relativePath for the given handle.
  */
 public class RelativeFileStateHandle extends FileStateHandle {
 	private static final long serialVersionUID = 1L;
@@ -33,26 +33,18 @@ public class RelativeFileStateHandle extends FileStateHandle {
 	private final String relativePath;
 
 	public RelativeFileStateHandle(
-		Path path,
-		String relativePath,
-		long stateSize) {
+			Path path,
+			String relativePath,
+			long stateSize) {
 		super(path, stateSize);
 		this.relativePath = relativePath;
 	}
 
-	@Override
-	public void discardState() throws Exception {
-		super.discardState();
-	}
-
 	public String getRelativePath() {
 		return relativePath;
 	}
 
-	@Override
-	public String toString() {
-		return String.format("RelativeFileStateHandle State: %s, %s [%d bytes]", getFilePath(), relativePath, getStateSize());
-	}
+	// ------------------------------------------------------------------------
 
 	@Override
 	public boolean equals(Object o) {
@@ -72,5 +64,10 @@ public class RelativeFileStateHandle extends FileStateHandle {
 	public int hashCode() {
 		return 17 * super.hashCode() + relativePath.hashCode();
 	}
+
+	@Override
+	public String toString() {
+		return String.format("RelativeFileStateHandle State: %s, %s [%d bytes]", getFilePath(), relativePath, getStateSize());
+	}
 }
 


[flink] 04/14: [FLINK-5763][state backends] (follow-up) Pull scope and relative/absolut path decision out of FsCheckpointStateOutputStream

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5bd71c3c484517f138ffa771102eed28a062c47d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 15 14:58:58 2020 +0200

    [FLINK-5763][state backends] (follow-up) Pull scope and relative/absolut path decision out of FsCheckpointStateOutputStream
    
    The FsCheckpointStateOutputStream should not have to be aware of the notion of checkpoint
    state scopes (exclusive / shared), or which one supports which path types.
    
    (There is previous entropy handling code in the FsCheckpointStateOutputStream which arguably
    should also not be there).
---
 .../org/apache/flink/core/fs/EntropyInjector.java  |   4 +
 .../apache/flink/core/fs/EntropyInjectorTest.java  |   7 ++
 .../state/filesystem/FsCheckpointStorage.java      |   4 +-
 .../filesystem/FsCheckpointStreamFactory.java      |  41 ++++----
 .../FsCheckpointStateOutputStreamTest.java         |  33 +++---
 .../filesystem/FsCheckpointStreamFactoryTest.java  | 113 +++++++++++++++++++++
 .../filesystem/FsStateBackendEntropyTest.java      |   2 +-
 7 files changed, 163 insertions(+), 41 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
index b0cd9a6..1f67b34 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
@@ -89,6 +89,10 @@ public class EntropyInjector {
 
 	// ------------------------------------------------------------------------
 
+	public static boolean isEntropyInjecting(FileSystem fs) {
+		return getEntropyFs(fs) != null;
+	}
+
 	@Nullable
 	public static EntropyInjectingFileSystem getEntropyFs(FileSystem fs) {
 		if (fs instanceof EntropyInjectingFileSystem) {
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
index 5c3d253..ce62cbc 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
@@ -188,6 +188,13 @@ public class EntropyInjectorTest {
 		}
 	}
 
+	@Test
+	public void testIsEntropyFs() {
+		final FileSystem efs = new TestEntropyInjectingFs("test", "ignored");
+
+		assertTrue(EntropyInjector.isEntropyInjecting(efs));
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class TestEntropyInjectingFs extends LocalFileSystem implements EntropyInjectingFileSystem {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index 143a077..a7bf331 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
 
 import javax.annotation.Nullable;
@@ -176,8 +175,7 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
 				taskOwnedStateDirectory,
 				fileSystem,
 				writeBufferSize,
-				fileSizeThreshold,
-				CheckpointedStateScope.SHARED);
+				fileSizeThreshold);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 90a4faf..4a4db0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
-import org.apache.flink.core.fs.EntropyInjectingFileSystem;
 import org.apache.flink.core.fs.EntropyInjector;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -85,6 +84,9 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 	/** Cached handle to the file system for file operations. */
 	private final FileSystem filesystem;
 
+	/** Whether the file system dynamically injects entropy into the file paths. */
+	private final boolean entropyInjecting;
+
 	/**
 	 * Creates a new stream factory that stores its checkpoint data in the file system and location
 	 * defined by the given Path.
@@ -124,6 +126,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 		this.sharedStateDirectory = checkNotNull(sharedStateDirectory);
 		this.fileStateThreshold = fileStateSizeThreshold;
 		this.writeBufferSize = writeBufferSize;
+		this.entropyInjecting = EntropyInjector.isEntropyInjecting(fileSystem);
 	}
 
 	// ------------------------------------------------------------------------
@@ -133,7 +136,8 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 		Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
 		int bufferSize = Math.max(writeBufferSize, fileStateThreshold);
 
-		return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold, scope);
+		final boolean absolutePath = entropyInjecting || scope == CheckpointedStateScope.SHARED;
+		return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold, !absolutePath);
 	}
 
 	// ------------------------------------------------------------------------
@@ -153,7 +157,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 	 * A {@link CheckpointStreamFactory.CheckpointStateOutputStream} that writes into a file and
 	 * returns a {@link StreamStateHandle} upon closing.
 	 */
-	public static final class FsCheckpointStateOutputStream
+	public static class FsCheckpointStateOutputStream
 			extends CheckpointStreamFactory.CheckpointStateOutputStream {
 
 		private final byte[] writeBuffer;
@@ -174,14 +178,22 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 
 		private volatile boolean closed;
 
-		private final CheckpointedStateScope scope;
+		private final boolean allowRelativePaths;
+
+		public FsCheckpointStateOutputStream(
+				Path basePath,
+				FileSystem fs,
+				int bufferSize,
+				int localStateThreshold) {
+			this(basePath, fs, bufferSize, localStateThreshold, false);
+		}
 
 		public FsCheckpointStateOutputStream(
 					Path basePath,
 					FileSystem fs,
 					int bufferSize,
 					int localStateThreshold,
-					CheckpointedStateScope scope) {
+					boolean allowRelativePaths) {
 
 			if (bufferSize < localStateThreshold) {
 				throw new IllegalArgumentException();
@@ -191,7 +203,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 			this.fs = fs;
 			this.writeBuffer = new byte[bufferSize];
 			this.localStateThreshold = localStateThreshold;
-			this.scope = scope;
+			this.allowRelativePaths = allowRelativePaths;
 		}
 
 		@Override
@@ -328,20 +340,9 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 
 							outStream.close();
 
-							if (CheckpointedStateScope.EXCLUSIVE.equals(scope)) {
-								EntropyInjectingFileSystem efs = EntropyInjector.getEntropyFs(fs);
-								// currently, do not use relative state handle for entropy file system
-								if (efs != null) {
-									return new FileStateHandle(statePath, size);
-								} else {
-									return new RelativeFileStateHandle(
-										statePath,
-										relativeStatePath,
-										size);
-								}
-							} else {
-								return new FileStateHandle(statePath, size);
-							}
+							return allowRelativePaths
+									? new RelativeFileStateHandle(statePath, relativeStatePath, size)
+									: new FileStateHandle(statePath, size);
 						} catch (Exception exception) {
 							try {
 								if (statePath != null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index d3c9d15..d713e44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -70,13 +69,13 @@ import static org.mockito.Mockito.when;
 @RunWith(Parameterized.class)
 public class FsCheckpointStateOutputStreamTest {
 
-	@Parameterized.Parameters(name = "scope = {0}")
-	public static List<CheckpointedStateScope> parameters() {
-		return Arrays.asList(CheckpointedStateScope.values());
+	@Parameterized.Parameters(name = "relativePaths = {0}")
+	public static List<Boolean> parameters() {
+		return Arrays.asList(true, false);
 	}
 
 	@Parameterized.Parameter
-	public CheckpointedStateScope scope;
+	public boolean relativePaths;
 
 	@Rule
 	public final TemporaryFolder tempDir = new TemporaryFolder();
@@ -85,17 +84,17 @@ public class FsCheckpointStateOutputStreamTest {
 	public void testWrongParameters() throws Exception {
 		// this should fail
 		new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-			Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 4000, 5000, scope);
+			Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 4000, 5000, relativePaths);
 	}
 
 	@Test
 	public void testEmptyState() throws Exception {
 		FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
 				new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-						Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, scope);
+						Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, relativePaths);
 
 		StreamStateHandle handle = stream.closeAndGetHandle();
-		assertTrue(handle == null);
+		assertNull(handle);
 	}
 
 	@Test
@@ -122,7 +121,7 @@ public class FsCheckpointStateOutputStreamTest {
 	public void testGetPos() throws Exception {
 		FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
 				new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-						Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, scope);
+						Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, relativePaths);
 
 		for (int i = 0; i < 64; ++i) {
 			Assert.assertEquals(i, stream.getPos());
@@ -134,7 +133,7 @@ public class FsCheckpointStateOutputStreamTest {
 		// ----------------------------------------------------
 
 		stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-				Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, scope);
+				Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, relativePaths);
 
 		byte[] data = "testme!".getBytes(ConfigConstants.DEFAULT_CHARSET);
 
@@ -165,7 +164,7 @@ public class FsCheckpointStateOutputStreamTest {
 			fs,
 			4,
 			0,
-			scope);
+			relativePaths);
 
 		// this should create the underlying file stream
 		stream.write(new byte[] {1, 2, 3, 4, 5});
@@ -195,7 +194,7 @@ public class FsCheckpointStateOutputStreamTest {
 			fs,
 			4,
 			0,
-			scope);
+			relativePaths);
 
 		// this should create the underlying file stream
 		stream.write(new byte[] {1, 2, 3, 4, 5});
@@ -215,7 +214,7 @@ public class FsCheckpointStateOutputStreamTest {
 	private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
 		FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
 			new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-					Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), bufferSize, threshold, scope);
+					Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), bufferSize, threshold, relativePaths);
 
 		Random rnd = new Random();
 		byte[] original = new byte[numBytes];
@@ -264,7 +263,7 @@ public class FsCheckpointStateOutputStreamTest {
 	@Test
 	public void testWriteFailsFastWhenClosed() throws Exception {
 		FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
-				Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, scope);
+				Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, relativePaths);
 
 		assertFalse(stream.isClosed());
 
@@ -303,7 +302,7 @@ public class FsCheckpointStateOutputStreamTest {
 		final Path basePath = Path.fromLocalFile(directory);
 
 		final Supplier<CheckpointStateOutputStream> factory = () ->
-				new FsCheckpointStateOutputStream(basePath, FileSystem.getLocalFileSystem(), 1024, 15, scope);
+				new FsCheckpointStateOutputStream(basePath, FileSystem.getLocalFileSystem(), 1024, 15, relativePaths);
 
 		CheckpointStateOutputStream stream1 = factory.get();
 		CheckpointStateOutputStream stream2 = factory.get();
@@ -372,10 +371,10 @@ public class FsCheckpointStateOutputStreamTest {
 		FileSystem fs = spy(FileSystem.getLocalFileSystem());
 
 		FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
-				Path.fromLocalFile(directory), fs, 1024, 1, scope);
+				Path.fromLocalFile(directory), fs, 1024, 1, relativePaths);
 
 		FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
-				Path.fromLocalFile(directory), fs, 1024, 1, scope);
+				Path.fromLocalFile(directory), fs, 1024, 1, relativePaths);
 
 		stream1.write(new byte[61]);
 		stream2.write(new byte[61]);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
new file mode 100644
index 0000000..1fa5cc0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for the {@link FsCheckpointStreamFactory}.
+ */
+public class FsCheckpointStreamFactoryTest {
+
+	@Rule
+	public final TemporaryFolder TMP = new TemporaryFolder();
+
+	private Path exclusiveStateDir;
+	private Path sharedStateDir;
+
+	@Before
+	public void createStateDirectories() throws IOException {
+		exclusiveStateDir = Path.fromLocalFile(TMP.newFolder("exclusive"));
+		sharedStateDir = Path.fromLocalFile(TMP.newFolder("shared"));
+	}
+
+	// ------------------------------------------------------------------------
+	//  tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testExclusiveStateHasRelativePathHandles() throws IOException {
+		final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem());
+
+		final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
+				factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
+		stream.write(1657);
+		final StreamStateHandle handle = stream.closeAndGetHandle();
+
+		assertThat(handle, instanceOf(RelativeFileStateHandle.class));
+		assertPathsEqual(exclusiveStateDir, ((RelativeFileStateHandle) handle).getFilePath().getParent());
+	}
+
+	@Test
+	public void testSharedStateHasAbsolutePathHandles() throws IOException {
+		final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem());
+
+		final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
+			factory.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
+		stream.write(0);
+		final StreamStateHandle handle = stream.closeAndGetHandle();
+
+		assertThat(handle, instanceOf(FileStateHandle.class));
+		assertThat(handle, not(instanceOf(RelativeFileStateHandle.class)));
+		assertPathsEqual(sharedStateDir, ((FileStateHandle) handle).getFilePath().getParent());
+	}
+
+	@Test
+	public void testEntropyMakesExclusiveStateAbsolutePaths() throws IOException{
+		final FsCheckpointStreamFactory factory = createFactory(new FsStateBackendEntropyTest.TestEntropyAwareFs());
+
+		final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
+			factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
+		stream.write(0);
+		final StreamStateHandle handle = stream.closeAndGetHandle();
+
+		assertThat(handle, instanceOf(FileStateHandle.class));
+		assertThat(handle, not(instanceOf(RelativeFileStateHandle.class)));
+		assertPathsEqual(exclusiveStateDir, ((FileStateHandle) handle).getFilePath().getParent());
+	}
+
+	// ------------------------------------------------------------------------
+	//  test utils
+	// ------------------------------------------------------------------------
+
+	private static void assertPathsEqual(Path expected, Path actual) {
+		final Path reNormalizedExpected = new Path(expected.toString());
+		final Path reNormalizedActual = new Path(actual.toString());
+		assertEquals(reNormalizedExpected, reNormalizedActual);
+	}
+
+	private FsCheckpointStreamFactory createFactory(FileSystem fs) {
+		return new FsCheckpointStreamFactory(fs, exclusiveStateDir, sharedStateDir, 0, 4096);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
index 47955b1..72ed803 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -109,7 +109,7 @@ public class FsStateBackendEntropyTest {
 		}
 	}
 
-	private static class TestEntropyAwareFs extends LocalFileSystem implements EntropyInjectingFileSystem {
+	static class TestEntropyAwareFs extends LocalFileSystem implements EntropyInjectingFileSystem {
 
 		@Override
 		public String getEntropyInjectionKey() {


[flink] 05/14: [hotfix][tests] Make FsCheckpointStateOutputStreamTest work on Windows OS

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21f8234dad2406e864ef1f91630ad5594becb414
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sat May 16 00:42:17 2020 +0200

    [hotfix][tests] Make FsCheckpointStateOutputStreamTest work on Windows OS
---
 .../runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index d713e44..de9b0e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -53,6 +53,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
@@ -365,7 +366,9 @@ public class FsCheckpointStateOutputStreamTest {
 		final File directory = tempDir.newFolder();
 
 		// prevent creation of files in that directory
-		assertTrue(directory.setWritable(false, true));
+		// this operation does not work reliably on Windows, so we use an "assume" to skip the test
+		// is this prerequisite operation is not supported.
+		assumeTrue(directory.setWritable(false, true));
 		checkDirectoryNotWritable(directory);
 
 		FileSystem fs = spy(FileSystem.getLocalFileSystem());


[flink] 08/14: [FLINK-17694][core] Fix argument check for SimpleVersionedSerialization

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3ba9b38cd750d12db759007235cc7f8497ee4170
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 10 19:40:15 2020 +0200

    [FLINK-17694][core] Fix argument check for SimpleVersionedSerialization
---
 .../core/io/SimpleVersionedSerialization.java      |  2 +-
 .../core/io/SimpleVersionedSerializationTest.java  | 49 +++++++++++++---------
 2 files changed, 30 insertions(+), 21 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
index 2c5b68c..92c2bb9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
@@ -146,7 +146,7 @@ public class SimpleVersionedSerialization {
 	public static <T> T readVersionAndDeSerialize(SimpleVersionedSerializer<T> serializer, byte[] bytes) throws IOException {
 		checkNotNull(serializer, "serializer");
 		checkNotNull(bytes, "bytes");
-		checkArgument(bytes.length >= 4, "byte array below minimum length (4 bytes)");
+		checkArgument(bytes.length >= 8, "byte array below minimum length (8 bytes)");
 
 		final byte[] dataOnly = Arrays.copyOfRange(bytes, 8, bytes.length);
 		final int version =
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
index 116d37c..d3648e4 100644
--- a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
@@ -36,26 +36,7 @@ public class SimpleVersionedSerializationTest {
 
 	@Test
 	public void testSerializationRoundTrip() throws IOException {
-		final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() {
-
-			private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
-
-			@Override
-			public int getVersion() {
-				return VERSION;
-			}
-
-			@Override
-			public byte[] serialize(String str) throws IOException {
-				return str.getBytes(StandardCharsets.UTF_8);
-			}
-
-			@Override
-			public String deserialize(int version, byte[] serialized) throws IOException {
-				assertEquals(VERSION, version);
-				return new String(serialized, StandardCharsets.UTF_8);
-			}
-		};
+		final SimpleVersionedSerializer<String> utfEncoder = new TestStringSerializer();
 
 		final String testString = "dugfakgs";
 		final DataOutputSerializer out = new DataOutputSerializer(32);
@@ -109,4 +90,32 @@ public class SimpleVersionedSerializationTest {
 		assertEquals(testString, deserialized);
 		assertEquals(testString, deserializedFromBytes);
 	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testUnderflow() throws Exception {
+		SimpleVersionedSerialization.readVersionAndDeSerialize(new TestStringSerializer(), new byte[7]);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TestStringSerializer implements SimpleVersionedSerializer<String> {
+
+		private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		@Override
+		public byte[] serialize(String str) throws IOException {
+			return str.getBytes(StandardCharsets.UTF_8);
+		}
+
+		@Override
+		public String deserialize(int version, byte[] serialized) throws IOException {
+			assertEquals(VERSION, version);
+			return new String(serialized, StandardCharsets.UTF_8);
+		}
+	}
 }


[flink] 10/14: [FLINK-10740][DataStream API] Simplify SourceOperator by using SimpleVersionedListState

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 61d0344cb973fd42c210d9a97f3ea1c0697445ce
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 10 20:50:29 2020 +0200

    [FLINK-10740][DataStream API] Simplify SourceOperator by using SimpleVersionedListState
---
 .../java/org/apache/flink/util/CollectionUtil.java | 17 +++++
 .../streaming/api/operators/SourceOperator.java    | 75 ++++------------------
 .../api/operators/SourceOperatorTest.java          | 20 ++----
 3 files changed, 36 insertions(+), 76 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
index 45ceaa4..89d61f9 100644
--- a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
@@ -20,8 +20,11 @@ package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -82,4 +85,18 @@ public final class CollectionUtil {
 			.map(projector)
 			.collect(toList());
 	}
+
+	/**
+	 * Collects the elements in the Iterable in a List. If the iterable argument is null,
+	 * this method returns an empty list.
+	 */
+	public static <E> List<E> iterableToList(@Nullable Iterable<E> iterable) {
+		if (iterable == null) {
+			return Collections.emptyList();
+		}
+
+		final ArrayList<E> list = new ArrayList<>();
+		iterable.iterator().forEachRemaining(list::add);
+		return list;
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 84c0a60..4b17b0d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -38,12 +38,11 @@ import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.runtime.io.InputStatus;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.util.CollectionUtil;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
@@ -66,19 +65,20 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 
 	private final Source<OUT, SplitT, ?> source;
 
+	private final SimpleVersionedSerializer<SplitT> splitSerializer;
+
 	// Fields that will be setup at runtime.
 	private transient SourceReader<OUT, SplitT> sourceReader;
-	private transient SimpleVersionedSerializer<SplitT> splitSerializer;
-	private transient ListState<byte[]> readerState;
+	private transient ListState<SplitT> readerState;
 	private transient OperatorEventGateway operatorEventGateway;
 
 	public SourceOperator(Source<OUT, SplitT, ?> source) {
 		this.source = source;
+		this.splitSerializer = source.getSplitSerializer();
 	}
 
 	@Override
 	public void open() throws Exception {
-		splitSerializer = source.getSplitSerializer();
 		// Create the source reader.
 		SourceReaderContext context = new SourceReaderContext() {
 			@Override
@@ -94,16 +94,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 		sourceReader = source.createReader(context);
 
 		// restore the state if necessary.
-		if (readerState.get() != null && readerState.get().iterator().hasNext()) {
-			List<SplitT> splits = new ArrayList<>();
-			for (byte[] splitBytes : readerState.get()) {
-				SplitStateAndVersion stateWithVersion = SplitStateAndVersion.fromBytes(splitBytes);
-				splits.add(splitSerializer.deserialize(
-						stateWithVersion.getSerializerVersion(),
-						stateWithVersion.getSplitState()));
-			}
+		final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());
+		if (!splits.isEmpty()) {
 			sourceReader.addSplits(splits);
 		}
+
 		// Start the reader.
 		sourceReader.start();
 		// Register the reader to the coordinator.
@@ -128,15 +123,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 	@Override
 	public void snapshotState(StateSnapshotContext context) throws Exception {
 		LOG.debug("Taking a snapshot for checkpoint {}", context.getCheckpointId());
-		List<SplitT> splitStates = sourceReader.snapshotState();
-		List<byte[]> state = new ArrayList<>();
-		for (SplitT splitState : splitStates) {
-			SplitStateAndVersion stateWithVersion = new SplitStateAndVersion(
-					splitSerializer.getVersion(),
-					splitSerializer.serialize(splitState));
-			state.add(stateWithVersion.toBytes());
-		}
-		readerState.update(state);
+		readerState.update(sourceReader.snapshotState());
 	}
 
 	@Override
@@ -147,7 +134,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 	@Override
 	public void initializeState(StateInitializationContext context) throws Exception {
 		super.initializeState(context);
-		readerState = context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
+		final ListState<byte[]> rawState = context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
+		readerState = new SimpleVersionedListState<>(rawState, splitSerializer);
 	}
 
 	public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
@@ -178,43 +166,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 		return sourceReader;
 	}
 
-	// --------------- private class -----------------
-
-	/**
-	 * Static container class. Package private for testing.
-	 */
 	@VisibleForTesting
-	static class SplitStateAndVersion {
-		private final int serializerVersion;
-		private final byte[] splitState;
-
-		SplitStateAndVersion(int serializerVersion, byte[] splitState) {
-			this.serializerVersion = serializerVersion;
-			this.splitState = splitState;
-		}
-
-		int getSerializerVersion() {
-			return serializerVersion;
-		}
-
-		byte[] getSplitState() {
-			return splitState;
-		}
-
-		byte[] toBytes() {
-			// 4 Bytes - Serialization Version
-			// N Bytes - Serialized Split State
-			ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + Integer.BYTES + splitState.length);
-			buf.putInt(serializerVersion);
-			buf.put(splitState);
-			return buf.array();
-		}
-
-		static SplitStateAndVersion fromBytes(byte[] bytes) {
-			ByteBuffer buf = ByteBuffer.wrap(bytes);
-			int version = buf.getInt();
-			byte[] splitState = Arrays.copyOfRange(bytes, buf.position(), buf.limit());
-			return new SplitStateAndVersion(version, splitState);
-		}
+	ListState<SplitT> getReaderState() {
+		return readerState;
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 65239c8..453b083 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.connector.source.mocks.MockSourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -41,11 +42,11 @@ import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.util.CollectionUtil;
 
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -134,17 +135,7 @@ public class SourceOperatorTest {
 		operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L));
 
 		// Verify the splits in state.
-		List<MockSourceSplit> splitsInState = new ArrayList<>();
-		Iterable<byte[]> serializedSplits =
-				stateContext.getOperatorStateStore().getListState(SourceOperator.SPLITS_STATE_DESC).get();
-		for (byte[] serialized : serializedSplits) {
-			MockSourceSplitSerializer serializer = new MockSourceSplitSerializer();
-			SourceOperator.SplitStateAndVersion stateAndVersion =
-					SourceOperator.SplitStateAndVersion.fromBytes(serialized);
-			splitsInState.add(serializer.deserialize(
-					stateAndVersion.getSerializerVersion(),
-					stateAndVersion.getSplitState()));
-		}
+		List<MockSourceSplit> splitsInState = CollectionUtil.iterableToList(operator.getReaderState().get());
 		assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), splitsInState);
 	}
 
@@ -152,9 +143,8 @@ public class SourceOperatorTest {
 
 	private StateInitializationContext getStateContext() throws Exception {
 		// Create a mock split.
-		byte[] serializedSplit = new MockSourceSplitSerializer().serialize(MOCK_SPLIT);
-		byte[] serializedSplitWithVersion =
-				new SourceOperator.SplitStateAndVersion(0, serializedSplit).toBytes();
+		byte[] serializedSplitWithVersion = SimpleVersionedSerialization
+				.writeVersionAndSerialize(new MockSourceSplitSerializer(), MOCK_SPLIT);
 
 		// Crate the state context.
 		OperatorStateStore operatorStateStore = createOperatorStateStore();


[flink] 02/14: [FLINK-5763][state backends] Make savepoint selfcontain and relocatable

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5583c75394972dd4564132e3a0896a7f4f85a19
Author: klion26 <qc...@gmail.com>
AuthorDate: Sat Apr 25 20:35:38 2020 +0800

    [FLINK-5763][state backends] Make savepoint selfcontain and relocatable
---
 .../org/apache/flink/core/fs/EntropyInjector.java  |  2 +-
 .../flink/state/api/runtime/SavepointLoader.java   |  2 +-
 .../flink/runtime/checkpoint/Checkpoints.java      |  8 +--
 .../metadata/ChannelStateHandleSerializer.java     |  2 +-
 .../checkpoint/metadata/MetadataSerializer.java    |  3 +-
 .../checkpoint/metadata/MetadataV1Serializer.java  |  2 +-
 .../checkpoint/metadata/MetadataV2Serializer.java  | 12 ++--
 .../metadata/MetadataV2V3SerializerBase.java       | 62 +++++++++++++-----
 .../checkpoint/metadata/MetadataV3Serializer.java  | 10 +--
 .../state/filesystem/FsCheckpointStorage.java      |  6 +-
 .../filesystem/FsCheckpointStreamFactory.java      | 34 ++++++++--
 .../FsCompletedCheckpointStorageLocation.java      |  4 ++
 .../state/filesystem/RelativeFileStateHandle.java  | 76 ++++++++++++++++++++++
 .../metadata/CheckpointMetadataTest.java           |  2 +-
 .../checkpoint/metadata/CheckpointTestUtils.java   | 51 +++++++++++----
 .../metadata/MetadataV3SerializerTest.java         | 67 +++++++++++++++----
 .../IncrementalRemoteKeyedStateHandleTest.java     |  2 +-
 .../FsCheckpointStateOutputStreamTest.java         | 38 +++++++----
 .../StreamTaskStateInitializerImplTest.java        |  8 +--
 .../flink/streaming/util/OperatorSnapshotUtil.java | 10 +--
 .../flink/test/checkpointing/SavepointITCase.java  | 27 +++++++-
 21 files changed, 334 insertions(+), 94 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
index fb49008..b0cd9a6 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
@@ -90,7 +90,7 @@ public class EntropyInjector {
 	// ------------------------------------------------------------------------
 
 	@Nullable
-	private static EntropyInjectingFileSystem getEntropyFs(FileSystem fs) {
+	public static EntropyInjectingFileSystem getEntropyFs(FileSystem fs) {
 		if (fs instanceof EntropyInjectingFileSystem) {
 			return (EntropyInjectingFileSystem) fs;
 		}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java
index 95c89ca..ee20829 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java
@@ -50,7 +50,7 @@ public final class SavepointLoader {
 			.resolveCheckpointPointer(savepointPath);
 
 		try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) {
-			return Checkpoints.loadCheckpointMetadata(stream, Thread.currentThread().getContextClassLoader());
+			return Checkpoints.loadCheckpointMetadata(stream, Thread.currentThread().getContextClassLoader(), savepointPath);
 		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 3033ec4..6e23131 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -94,7 +94,7 @@ public class Checkpoints {
 	//  Reading and validating checkpoint metadata
 	// ------------------------------------------------------------------------
 
-	public static CheckpointMetadata loadCheckpointMetadata(DataInputStream in, ClassLoader classLoader) throws IOException {
+	public static CheckpointMetadata loadCheckpointMetadata(DataInputStream in, ClassLoader classLoader, String externalPointer) throws IOException {
 		checkNotNull(in, "input stream");
 		checkNotNull(classLoader, "classLoader");
 
@@ -103,7 +103,7 @@ public class Checkpoints {
 		if (magicNumber == HEADER_MAGIC_NUMBER) {
 			final int version = in.readInt();
 			final MetadataSerializer serializer = MetadataSerializers.getSerializer(version);
-			return serializer.deserialize(in, classLoader);
+			return serializer.deserialize(in, classLoader, externalPointer);
 		}
 		else {
 			throw new IOException("Unexpected magic number. This can have multiple reasons: " +
@@ -132,7 +132,7 @@ public class Checkpoints {
 		final CheckpointMetadata checkpointMetadata;
 		try (InputStream in = metadataHandle.openInputStream()) {
 			DataInputStream dis = new DataInputStream(in);
-			checkpointMetadata = loadCheckpointMetadata(dis, classLoader);
+			checkpointMetadata = loadCheckpointMetadata(dis, classLoader, checkpointPointer);
 		}
 
 		// generate mapping from operator to task
@@ -225,7 +225,7 @@ public class Checkpoints {
 		try (InputStream in = metadataHandle.openInputStream();
 			DataInputStream dis = new DataInputStream(in)) {
 
-			metadata = loadCheckpointMetadata(dis, classLoader);
+			metadata = loadCheckpointMetadata(dis, classLoader, pointer);
 		}
 
 		Exception exception = null;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/ChannelStateHandleSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/ChannelStateHandleSerializer.java
index d69241e..f7b2c8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/ChannelStateHandleSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/ChannelStateHandleSerializer.java
@@ -88,6 +88,6 @@ class ChannelStateHandleSerializer {
 		for (int i = 0; i < offsetsSize; i++) {
 			offsets.add(dis.readLong());
 		}
-		return handleBuilder.apply(deserializeStreamStateHandle(dis), offsets, info);
+		return handleBuilder.apply(deserializeStreamStateHandle(dis, null), offsets, info);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataSerializer.java
index e9aa145..ff8c3a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataSerializer.java
@@ -36,8 +36,9 @@ public interface MetadataSerializer extends Versioned {
 	 *
 	 * @param dis Input stream to deserialize savepoint from
 	 * @param  userCodeClassLoader the user code class loader
+	 * @param externalPointer the external pointer of the given checkpoint
 	 * @return The deserialized savepoint
 	 * @throws IOException Serialization failures are forwarded
 	 */
-	CheckpointMetadata deserialize(DataInputStream dis, ClassLoader userCodeClassLoader) throws IOException;
+	CheckpointMetadata deserialize(DataInputStream dis, ClassLoader userCodeClassLoader, String externalPointer) throws IOException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV1Serializer.java
index fb6d982..7f2da82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV1Serializer.java
@@ -44,7 +44,7 @@ public class MetadataV1Serializer implements MetadataSerializer {
 	}
 
 	@Override
-	public CheckpointMetadata deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
+	public CheckpointMetadata deserialize(DataInputStream dis, ClassLoader cl, String externalPointer) throws IOException {
 		throw new IOException("This savepoint / checkpoint version (Flink 1.1 / 1.2) is no longer supported.");
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
index bbf25d8..e02505a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
@@ -56,8 +56,8 @@ public class MetadataV2Serializer extends MetadataV2V3SerializerBase implements
 	// ------------------------------------------------------------------------
 
 	@Override
-	public CheckpointMetadata deserialize(DataInputStream dis, ClassLoader classLoader) throws IOException {
-		return deserializeMetadata(dis);
+	public CheckpointMetadata deserialize(DataInputStream dis, ClassLoader classLoader, String externalPointer) throws IOException {
+		return deserializeMetadata(dis, externalPointer);
 	}
 
 	// ------------------------------------------------------------------------
@@ -89,7 +89,7 @@ public class MetadataV2Serializer extends MetadataV2V3SerializerBase implements
 	}
 
 	@Override
-	protected OperatorState deserializeOperatorState(DataInputStream dis) throws IOException {
+	protected OperatorState deserializeOperatorState(DataInputStream dis, String externalPointer) throws IOException {
 		final OperatorID jobVertexId = new OperatorID(dis.readLong(), dis.readLong());
 		final int parallelism = dis.readInt();
 		final int maxParallelism = dis.readInt();
@@ -106,7 +106,7 @@ public class MetadataV2Serializer extends MetadataV2V3SerializerBase implements
 
 		for (int j = 0; j < numSubTaskStates; j++) {
 			final int subtaskIndex = dis.readInt();
-			final OperatorSubtaskState subtaskState = deserializeSubtaskState(dis);
+			final OperatorSubtaskState subtaskState = deserializeSubtaskState(dis, externalPointer);
 			taskState.putState(subtaskIndex, subtaskState);
 		}
 
@@ -125,7 +125,7 @@ public class MetadataV2Serializer extends MetadataV2V3SerializerBase implements
 	}
 
 	@Override
-	protected OperatorSubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
+	protected OperatorSubtaskState deserializeSubtaskState(DataInputStream dis, String externalPointer) throws IOException {
 		// read two unused fields for compatibility:
 		//   - "duration"
 		//   - number of legacy states
@@ -138,6 +138,6 @@ public class MetadataV2Serializer extends MetadataV2V3SerializerBase implements
 					"no longer supported.");
 		}
 
-		return super.deserializeSubtaskState(dis);
+		return super.deserializeSubtaskState(dis, externalPointer);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index 3101bd0..2ea18be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -37,7 +37,10 @@ import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.FunctionWithException;
@@ -58,6 +61,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Base (De)serializer for checkpoint metadata format version 2 and 3.
  *
@@ -88,6 +93,10 @@ public abstract class MetadataV2V3SerializerBase {
 	private static final byte KEY_GROUPS_HANDLE = 3;
 	private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
 	private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
+	private static final byte RELATIVE_STREAM_STATE_HANDLE = 6;
+
+	private static Path exclusiveCheckpointDir = null;
+	private static String previousExternalPointer = null;
 
 	// ------------------------------------------------------------------------
 	//  (De)serialization entry points
@@ -113,7 +122,7 @@ public abstract class MetadataV2V3SerializerBase {
 		}
 	}
 
-	protected CheckpointMetadata deserializeMetadata(DataInputStream dis) throws IOException {
+	protected CheckpointMetadata deserializeMetadata(DataInputStream dis, String externalPointer) throws IOException {
 		// first: checkpoint ID
 		final long checkpointId = dis.readLong();
 		if (checkpointId < 0) {
@@ -142,7 +151,7 @@ public abstract class MetadataV2V3SerializerBase {
 		final List<OperatorState> operatorStates = new ArrayList<>(numTaskStates);
 
 		for (int i = 0; i < numTaskStates; i++) {
-			operatorStates.add(deserializeOperatorState(dis));
+			operatorStates.add(deserializeOperatorState(dis, externalPointer));
 		}
 
 		return new CheckpointMetadata(checkpointId, operatorStates, masterStates);
@@ -211,7 +220,7 @@ public abstract class MetadataV2V3SerializerBase {
 
 	protected abstract void serializeOperatorState(OperatorState operatorState, DataOutputStream dos) throws IOException;
 
-	protected abstract OperatorState deserializeOperatorState(DataInputStream dis) throws IOException;
+	protected abstract OperatorState deserializeOperatorState(DataInputStream dis, String deserializeOperatorState) throws IOException;
 
 	// ------------------------------------------------------------------------
 	//  operator subtask state (de)serialization methods
@@ -241,15 +250,15 @@ public abstract class MetadataV2V3SerializerBase {
 		}
 	}
 
-	protected OperatorSubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
+	protected OperatorSubtaskState deserializeSubtaskState(DataInputStream dis, String externalPointer) throws IOException {
 		final boolean hasManagedOperatorState = dis.readInt() != 0;
-		final OperatorStateHandle managedOperatorState = hasManagedOperatorState ? deserializeOperatorStateHandle(dis) : null;
+		final OperatorStateHandle managedOperatorState = hasManagedOperatorState ? deserializeOperatorStateHandle(dis, externalPointer) : null;
 
 		final boolean hasRawOperatorState = dis.readInt() != 0;
-		final OperatorStateHandle rawOperatorState = hasRawOperatorState ? deserializeOperatorStateHandle(dis) : null;
+		final OperatorStateHandle rawOperatorState = hasRawOperatorState ? deserializeOperatorStateHandle(dis, externalPointer) : null;
 
-		final KeyedStateHandle managedKeyedState = deserializeKeyedStateHandle(dis);
-		final KeyedStateHandle rawKeyedState = deserializeKeyedStateHandle(dis);
+		final KeyedStateHandle managedKeyedState = deserializeKeyedStateHandle(dis, externalPointer);
+		final KeyedStateHandle rawKeyedState = deserializeKeyedStateHandle(dis, externalPointer);
 
 		StateObjectCollection<InputChannelStateHandle> inputChannelState = deserializeInputChannelStateHandle(dis);
 
@@ -312,14 +321,15 @@ public abstract class MetadataV2V3SerializerBase {
 	}
 
 	private static Map<StateHandleID, StreamStateHandle> deserializeStreamStateHandleMap(
-			DataInputStream dis) throws IOException {
+			DataInputStream dis,
+			String externalPointer) throws IOException {
 
 		final int size = dis.readInt();
 		Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
 
 		for (int i = 0; i < size; ++i) {
 			StateHandleID stateHandleID = new StateHandleID(dis.readUTF());
-			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
+			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, externalPointer);
 			result.put(stateHandleID, stateHandle);
 		}
 
@@ -327,7 +337,7 @@ public abstract class MetadataV2V3SerializerBase {
 	}
 
 	@VisibleForTesting
-	public static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
+	public static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis, String externalPointer) throws IOException {
 		final int type = dis.readByte();
 		if (NULL_HANDLE == type) {
 
@@ -344,7 +354,7 @@ public abstract class MetadataV2V3SerializerBase {
 			}
 			KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(
 				keyGroupRange, offsets);
-			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
+			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, externalPointer);
 			return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
 		} else if (INCREMENTAL_KEY_GROUPS_HANDLE == type) {
 
@@ -355,9 +365,9 @@ public abstract class MetadataV2V3SerializerBase {
 			KeyGroupRange keyGroupRange =
 				KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
 
-			StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis);
-			Map<StateHandleID, StreamStateHandle> sharedStates = deserializeStreamStateHandleMap(dis);
-			Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis);
+			StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis, externalPointer);
+			Map<StateHandleID, StreamStateHandle> sharedStates = deserializeStreamStateHandleMap(dis, externalPointer);
+			Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis, externalPointer);
 
 			UUID uuid;
 
@@ -411,7 +421,8 @@ public abstract class MetadataV2V3SerializerBase {
 
 	@VisibleForTesting
 	public static OperatorStateHandle deserializeOperatorStateHandle(
-			DataInputStream dis) throws IOException {
+			DataInputStream dis,
+			String externalPointer) throws IOException {
 
 		final int type = dis.readByte();
 		if (NULL_HANDLE == type) {
@@ -434,7 +445,7 @@ public abstract class MetadataV2V3SerializerBase {
 						new OperatorStateHandle.StateMetaInfo(offsets, mode);
 				offsetsMap.put(key, metaInfo);
 			}
-			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
+			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, externalPointer);
 			return new OperatorStreamStateHandle(offsetsMap, stateHandle);
 		} else {
 			throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
@@ -448,6 +459,11 @@ public abstract class MetadataV2V3SerializerBase {
 		if (stateHandle == null) {
 			dos.writeByte(NULL_HANDLE);
 
+		} else if (stateHandle instanceof RelativeFileStateHandle) {
+			dos.writeByte(RELATIVE_STREAM_STATE_HANDLE);
+			RelativeFileStateHandle relativeFileStateHandle = (RelativeFileStateHandle) stateHandle;
+			dos.writeUTF(relativeFileStateHandle.getRelativePath());
+			dos.writeLong(relativeFileStateHandle.getStateSize());
 		} else if (stateHandle instanceof FileStateHandle) {
 			dos.writeByte(FILE_STREAM_STATE_HANDLE);
 			FileStateHandle fileStateHandle = (FileStateHandle) stateHandle;
@@ -468,7 +484,7 @@ public abstract class MetadataV2V3SerializerBase {
 		dos.flush();
 	}
 
-	public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+	public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis, @Nullable String externalPointer) throws IOException {
 		final int type = dis.read();
 		if (NULL_HANDLE == type) {
 			return null;
@@ -482,6 +498,16 @@ public abstract class MetadataV2V3SerializerBase {
 			byte[] data = new byte[numBytes];
 			dis.readFully(data);
 			return new ByteStreamStateHandle(handleName, data);
+		} else if (RELATIVE_STREAM_STATE_HANDLE == type) {
+			checkArgument(externalPointer != null, "external pointer should not be null when deserializing relative state handle.");
+			String relativePath = dis.readUTF();
+			long size = dis.readLong();
+			if (exclusiveCheckpointDir == null || (!externalPointer.equals(previousExternalPointer))) {
+				exclusiveCheckpointDir = ((FsCompletedCheckpointStorageLocation) (AbstractFsCheckpointStorage.resolveCheckpointPointer(externalPointer))).getExclusiveCheckpointDir();
+				previousExternalPointer = externalPointer;
+			}
+			Path statePath = new Path(exclusiveCheckpointDir, relativePath);
+			return new RelativeFileStateHandle(statePath, relativePath, size);
 		} else {
 			throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
index cfd648f..3376ce7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
@@ -71,8 +71,8 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 	}
 
 	@Override
-	public CheckpointMetadata deserialize(DataInputStream dis, ClassLoader classLoader) throws IOException {
-		return deserializeMetadata(dis);
+	public CheckpointMetadata deserialize(DataInputStream dis, ClassLoader classLoader, String externalPointer) throws IOException {
+		return deserializeMetadata(dis, externalPointer);
 	}
 
 	// ------------------------------------------------------------------------
@@ -109,7 +109,7 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 	}
 
 	@Override
-	protected OperatorState deserializeOperatorState(DataInputStream dis) throws IOException {
+	protected OperatorState deserializeOperatorState(DataInputStream dis, String externalPointer) throws IOException {
 		final OperatorID jobVertexId = new OperatorID(dis.readLong(), dis.readLong());
 		final int parallelism = dis.readInt();
 		final int maxParallelism = dis.readInt();
@@ -117,14 +117,14 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 		final OperatorState operatorState = new OperatorState(jobVertexId, parallelism, maxParallelism);
 
 		// Coordinator state
-		operatorState.setCoordinatorState(deserializeStreamStateHandle(dis));
+		operatorState.setCoordinatorState(deserializeStreamStateHandle(dis, externalPointer));
 
 		// Sub task states
 		final int numSubTaskStates = dis.readInt();
 
 		for (int j = 0; j < numSubTaskStates; j++) {
 			final int subtaskIndex = dis.readInt();
-			final OperatorSubtaskState subtaskState = deserializeSubtaskState(dis);
+			final OperatorSubtaskState subtaskState = deserializeSubtaskState(dis, externalPointer);
 			operatorState.putState(subtaskIndex, subtaskState);
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index 83cca17..143a077 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
 
 import javax.annotation.Nullable;
@@ -169,11 +170,14 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
 
 	@Override
 	public CheckpointStateOutputStream createTaskOwnedStateStream() {
+		// as the comment of CheckpointStorageWorkerView#createTaskOwnedStateStream said we may change into shared state,
+		// so we use CheckpointedStateScope.SHARED here.
 		return new FsCheckpointStateOutputStream(
 				taskOwnedStateDirectory,
 				fileSystem,
 				writeBufferSize,
-				fileSizeThreshold);
+				fileSizeThreshold,
+				CheckpointedStateScope.SHARED);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 379da0e..90a4faf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.core.fs.EntropyInjectingFileSystem;
 import org.apache.flink.core.fs.EntropyInjector;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -132,7 +133,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 		Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
 		int bufferSize = Math.max(writeBufferSize, fileStateThreshold);
 
-		return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);
+		return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold, scope);
 	}
 
 	// ------------------------------------------------------------------------
@@ -169,11 +170,18 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 
 		private Path statePath;
 
+		private String relativeStatePath;
+
 		private volatile boolean closed;
 
+		private final CheckpointedStateScope scope;
+
 		public FsCheckpointStateOutputStream(
-					Path basePath, FileSystem fs,
-					int bufferSize, int localStateThreshold) {
+					Path basePath,
+					FileSystem fs,
+					int bufferSize,
+					int localStateThreshold,
+					CheckpointedStateScope scope) {
 
 			if (bufferSize < localStateThreshold) {
 				throw new IllegalArgumentException();
@@ -183,6 +191,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 			this.fs = fs;
 			this.writeBuffer = new byte[bufferSize];
 			this.localStateThreshold = localStateThreshold;
+			this.scope = scope;
 		}
 
 		@Override
@@ -319,7 +328,20 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 
 							outStream.close();
 
-							return new FileStateHandle(statePath, size);
+							if (CheckpointedStateScope.EXCLUSIVE.equals(scope)) {
+								EntropyInjectingFileSystem efs = EntropyInjector.getEntropyFs(fs);
+								// currently, do not use relative state handle for entropy file system
+								if (efs != null) {
+									return new FileStateHandle(statePath, size);
+								} else {
+									return new RelativeFileStateHandle(
+										statePath,
+										relativeStatePath,
+										size);
+								}
+							} else {
+								return new FileStateHandle(statePath, size);
+							}
 						} catch (Exception exception) {
 							try {
 								if (statePath != null) {
@@ -346,7 +368,9 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 		}
 
 		private Path createStatePath() {
-			return new Path(basePath, UUID.randomUUID().toString());
+			final String fileName = UUID.randomUUID().toString();
+			relativeStatePath = fileName;
+			return new Path(basePath, fileName);
 		}
 
 		private void createStream() throws IOException {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java
index f21f9d1..87f315d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java
@@ -57,6 +57,10 @@ public class FsCompletedCheckpointStorageLocation implements CompletedCheckpoint
 		return externalPointer;
 	}
 
+	public Path getExclusiveCheckpointDir() {
+		return exclusiveCheckpointDir;
+	}
+
 	@Override
 	public FileStateHandle getMetadataHandle() {
 		return metadataFileHandle;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/RelativeFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/RelativeFileStateHandle.java
new file mode 100644
index 0000000..96b79ef
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/RelativeFileStateHandle.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+
+/**
+ * {@link StreamStateHandle} for state that was written to a file stream.
+ * The differences between {@link FileStateHandle} and {@link RelativeFileStateHandle} is that, {@link RelativeFileStateHandle}
+ * contains relativePath for the given handle.
+ */
+public class RelativeFileStateHandle extends FileStateHandle {
+	private static final long serialVersionUID = 1L;
+
+	private final String relativePath;
+
+	public RelativeFileStateHandle(
+		Path path,
+		String relativePath,
+		long stateSize) {
+		super(path, stateSize);
+		this.relativePath = relativePath;
+	}
+
+	@Override
+	public void discardState() throws Exception {
+		super.discardState();
+	}
+
+	public String getRelativePath() {
+		return relativePath;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("RelativeFileStateHandle State: %s, %s [%d bytes]", getFilePath(), relativePath, getStateSize());
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (o == this) {
+			return true;
+		}
+
+		if (!(o instanceof RelativeFileStateHandle)) {
+			return false;
+		}
+
+		RelativeFileStateHandle other = (RelativeFileStateHandle) o;
+		return super.equals(o) && relativePath.equals(other.relativePath);
+	}
+
+	@Override
+	public int hashCode() {
+		return 17 * super.hashCode() + relativePath.hashCode();
+	}
+}
+
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointMetadataTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointMetadataTest.java
index f4cc7d4..ea71b51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointMetadataTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointMetadataTest.java
@@ -45,7 +45,7 @@ public class CheckpointMetadataTest {
 		final int numMasterStates = 7;
 
 		Collection<OperatorState> taskStates =
-				CheckpointTestUtils.createOperatorStates(rnd, numTaskStates, numSubtaskStates);
+				CheckpointTestUtils.createOperatorStates(rnd, null, numTaskStates, numSubtaskStates);
 
 		Collection<MasterState> masterStates =
 				CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
index fc51f92..64cfafd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint.metadata;
 
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -35,9 +36,12 @@ import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.StringUtils;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -61,9 +65,14 @@ public class CheckpointTestUtils {
 
 	/**
 	 * Creates a random collection of OperatorState objects containing various types of state handles.
+	 *
+	 * @param basePath The basePath for savepoint, will be null for checkpoint.
+	 * @param numTaskStates Number of tasks.
+	 * @param numSubtasksPerTask Number of subtask for each task.
 	 */
 	public static Collection<OperatorState> createOperatorStates(
 			Random random,
+			@Nullable String basePath,
 			int numTaskStates,
 			int numSubtasksPerTask) {
 
@@ -75,7 +84,7 @@ public class CheckpointTestUtils {
 
 			final boolean hasCoordinatorState = random.nextBoolean();
 			if (hasCoordinatorState) {
-				final StreamStateHandle stateHandle = createDummyStreamStateHandle(random);
+				final StreamStateHandle stateHandle = createDummyStreamStateHandle(random, basePath);
 				taskState.setCoordinatorState(stateHandle);
 			}
 
@@ -113,22 +122,22 @@ public class CheckpointTestUtils {
 				KeyedStateHandle keyedStateStream = null;
 
 				if (hasKeyedBackend) {
-					if (isIncremental) {
+					if (isIncremental && !isSavepoint(basePath)) {
 						keyedStateBackend = createDummyIncrementalKeyedStateHandle(random);
 					} else {
-						keyedStateBackend = createDummyKeyGroupStateHandle(random);
+						keyedStateBackend = createDummyKeyGroupStateHandle(random, basePath);
 					}
 				}
 
 				if (hasKeyedStream) {
-					keyedStateStream = createDummyKeyGroupStateHandle(random);
+					keyedStateStream = createDummyKeyGroupStateHandle(random, basePath);
 				}
 
 				StateObjectCollection<InputChannelStateHandle> inputChannelStateHandles =
-					random.nextBoolean() ? singleton(createNewInputChannelStateHandle(random.nextInt(5), random)) : empty();
+					(random.nextBoolean() && !isSavepoint(basePath)) ? singleton(createNewInputChannelStateHandle(random.nextInt(5), random)) : empty();
 
 				StateObjectCollection<ResultSubpartitionStateHandle> resultSubpartitionStateHandles =
-					random.nextBoolean() ? singleton(createNewResultSubpartitionStateHandle(random.nextInt(5), random)) : empty();
+					(random.nextBoolean() && !isSavepoint(basePath)) ? singleton(createNewResultSubpartitionStateHandle(random.nextInt(5), random)) : empty();
 
 				taskState.putState(subtaskIdx, new OperatorSubtaskState(
 						operatorStateHandleBackend,
@@ -145,6 +154,10 @@ public class CheckpointTestUtils {
 		return taskStates;
 	}
 
+	private static boolean isSavepoint(String basePath) {
+		return basePath != null;
+	}
+
 	/**
 	 * Creates a bunch of random master states.
 	 */
@@ -188,7 +201,7 @@ public class CheckpointTestUtils {
 			42L,
 			createRandomStateHandleMap(rnd),
 			createRandomStateHandleMap(rnd),
-			createDummyStreamStateHandle(rnd));
+			createDummyStreamStateHandle(rnd, null));
 	}
 
 	public static Map<StateHandleID, StreamStateHandle> createRandomStateHandleMap(Random rnd) {
@@ -196,23 +209,33 @@ public class CheckpointTestUtils {
 		Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
 		for (int i = 0; i < size; ++i) {
 			StateHandleID randomId = new StateHandleID(createRandomUUID(rnd).toString());
-			StreamStateHandle stateHandle = createDummyStreamStateHandle(rnd);
+			StreamStateHandle stateHandle = createDummyStreamStateHandle(rnd, null);
 			result.put(randomId, stateHandle);
 		}
 
 		return result;
 	}
 
-	public static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd) {
+	public static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd, String basePath) {
 		return new KeyGroupsStateHandle(
 			new KeyGroupRangeOffsets(1, 1, new long[]{rnd.nextInt(1024)}),
-			createDummyStreamStateHandle(rnd));
+			createDummyStreamStateHandle(rnd, basePath));
 	}
 
-	public static StreamStateHandle createDummyStreamStateHandle(Random rnd) {
-		return new ByteStreamStateHandle(
-			String.valueOf(createRandomUUID(rnd)),
-			String.valueOf(createRandomUUID(rnd)).getBytes(ConfigConstants.DEFAULT_CHARSET));
+	public static StreamStateHandle createDummyStreamStateHandle(Random rnd, String basePath) {
+		if (!isSavepoint(basePath)) {
+			return new ByteStreamStateHandle(
+				String.valueOf(createRandomUUID(rnd)),
+				String.valueOf(createRandomUUID(rnd)).getBytes(ConfigConstants.DEFAULT_CHARSET));
+		} else {
+			long stateSize = rnd.nextLong();
+			if (stateSize <= 0) {
+				stateSize = -stateSize;
+			}
+			String relativePath = String.valueOf(createRandomUUID(rnd));
+			Path statePath = new Path(basePath, relativePath);
+			return new RelativeFileStateHandle(statePath, relativePath, stateSize);
+		}
 	}
 
 	private static UUID createRandomUUID(Random rnd) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java
index 9e4456c..e5e9dbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint.metadata;
 
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -25,7 +27,11 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -42,6 +48,9 @@ import static org.junit.Assert.assertEquals;
  */
 public class MetadataV3SerializerTest {
 
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
 	@Test
 	public void testCheckpointWithNoState() throws Exception {
 		final Random rnd = new Random();
@@ -51,7 +60,7 @@ public class MetadataV3SerializerTest {
 			final Collection<OperatorState> taskStates = Collections.emptyList();
 			final Collection<MasterState> masterStates = Collections.emptyList();
 
-			testCheckpointSerialization(checkpointId, taskStates, masterStates);
+			testCheckpointSerialization(checkpointId, taskStates, masterStates, null);
 		}
 	}
 
@@ -69,12 +78,20 @@ public class MetadataV3SerializerTest {
 			final Collection<MasterState> masterStates =
 					CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
 
-			testCheckpointSerialization(checkpointId, operatorStates, masterStates);
+			testCheckpointSerialization(checkpointId, operatorStates, masterStates, null);
 		}
 	}
 
 	@Test
-	public void testCheckpointWithOnlyTaskState() throws Exception {
+	public void testCheckpointWithOnlyTaskStateForCheckpoint() throws Exception {
+		testCheckpointWithOnlyTaskState(null);
+	}
+	@Test
+	public void testCheckpointWithOnlyTaskStateForSavepoint() throws Exception {
+		testCheckpointWithOnlyTaskState(temporaryFolder.newFolder().toURI().toString());
+	}
+
+	private void testCheckpointWithOnlyTaskState(String basePath) throws Exception {
 		final Random rnd = new Random();
 		final int maxTaskStates = 20;
 		final int maxNumSubtasks = 20;
@@ -85,16 +102,25 @@ public class MetadataV3SerializerTest {
 			final int numTasks = rnd.nextInt(maxTaskStates) + 1;
 			final int numSubtasks = rnd.nextInt(maxNumSubtasks) + 1;
 			final Collection<OperatorState> taskStates =
-					CheckpointTestUtils.createOperatorStates(rnd, numTasks, numSubtasks);
+					CheckpointTestUtils.createOperatorStates(rnd, basePath, numTasks, numSubtasks);
 
 			final Collection<MasterState> masterStates = Collections.emptyList();
 
-			testCheckpointSerialization(checkpointId, taskStates, masterStates);
+			testCheckpointSerialization(checkpointId, taskStates, masterStates, basePath);
 		}
 	}
 
 	@Test
-	public void testCheckpointWithMasterAndTaskState() throws Exception {
+	public void testCheckpointWithMasterAndTaskStateForCheckpoint() throws Exception {
+		testCheckpointWithMasterAndTaskState(null);
+	}
+
+	@Test
+	public void testCheckpointWithMasterAndTaskStateForSavepoint() throws Exception {
+		testCheckpointWithMasterAndTaskState(temporaryFolder.newFolder().toURI().toString());
+	}
+
+	private void testCheckpointWithMasterAndTaskState(String basePath) throws Exception {
 		final Random rnd = new Random();
 
 		final int maxNumMasterStates = 5;
@@ -107,34 +133,48 @@ public class MetadataV3SerializerTest {
 			final int numTasks = rnd.nextInt(maxTaskStates) + 1;
 			final int numSubtasks = rnd.nextInt(maxNumSubtasks) + 1;
 			final Collection<OperatorState> taskStates =
-					CheckpointTestUtils.createOperatorStates(rnd, numTasks, numSubtasks);
+					CheckpointTestUtils.createOperatorStates(rnd, basePath, numTasks, numSubtasks);
 
 			final int numMasterStates = rnd.nextInt(maxNumMasterStates) + 1;
 			final Collection<MasterState> masterStates =
 					CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
 
-			testCheckpointSerialization(checkpointId, taskStates, masterStates);
+			testCheckpointSerialization(checkpointId, taskStates, masterStates, basePath);
 		}
 	}
 
+	/**
+	 * Test checkpoint metadata (de)serialization.
+	 *
+	 * @param checkpointId The given checkpointId will write into the metadata.
+	 * @param operatorStates the given states for all the operators.
+	 * @param masterStates the masterStates of the given checkpoint/savepoint.
+	 */
 	private void testCheckpointSerialization(
 			long checkpointId,
 			Collection<OperatorState> operatorStates,
-			Collection<MasterState> masterStates) throws IOException {
+			Collection<MasterState> masterStates,
+			@Nullable String basePath) throws IOException {
 
 		MetadataV3Serializer serializer = MetadataV3Serializer.INSTANCE;
 
 		ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
 		DataOutputStream out = new DataOutputViewStreamWrapper(baos);
 
-		MetadataV3Serializer.serialize(new CheckpointMetadata(checkpointId, operatorStates, masterStates), out);
+		CheckpointMetadata metadata = new CheckpointMetadata(checkpointId, operatorStates, masterStates);
+		MetadataV3Serializer.serialize(metadata, out);
+		Path metaPath = null;
+		// add this because we need to resolve the checkpoint pointer in MetadataV2V3SerializerBase.
+		if (basePath != null) {
+			metaPath = new Path(basePath, "_metadata");
+			metaPath.getFileSystem().create(metaPath, FileSystem.WriteMode.OVERWRITE);
+		}
 		out.close();
 
 		byte[] bytes = baos.toByteArray();
 
 		DataInputStream in = new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(bytes));
-		CheckpointMetadata deserialized = serializer.deserialize(in, getClass().getClassLoader());
-
+		CheckpointMetadata deserialized = serializer.deserialize(in, getClass().getClassLoader(), basePath);
 		assertEquals(checkpointId, deserialized.getCheckpointId());
 		assertEquals(operatorStates, deserialized.getOperatorStates());
 
@@ -143,5 +183,8 @@ public class MetadataV3SerializerTest {
 				a.hasNext();) {
 			CheckpointTestUtils.assertMasterStateEquality(a.next(), b.next());
 		}
+		if (metaPath != null) {
+			metaPath.getFileSystem().delete(metaPath, true);
+		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
index 08d6874..6e7ed35 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
@@ -264,7 +264,7 @@ public class IncrementalRemoteKeyedStateHandleTest {
 			1L,
 			placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),
 			placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),
-			spy(CheckpointTestUtils.createDummyStreamStateHandle(rnd)));
+			spy(CheckpointTestUtils.createDummyStreamStateHandle(rnd, null)));
 	}
 
 	private static Map<StateHandleID, StreamStateHandle> placeSpies(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index c06cdfa..d3c9d15 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -32,6 +33,8 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.ArgumentCaptor;
 
 import java.io.DataInputStream;
@@ -40,6 +43,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Random;
 import java.util.function.Supplier;
 
@@ -62,8 +67,17 @@ import static org.mockito.Mockito.when;
 /**
  * Tests for the {@link FsCheckpointStateOutputStream}.
  */
+@RunWith(Parameterized.class)
 public class FsCheckpointStateOutputStreamTest {
 
+	@Parameterized.Parameters(name = "scope = {0}")
+	public static List<CheckpointedStateScope> parameters() {
+		return Arrays.asList(CheckpointedStateScope.values());
+	}
+
+	@Parameterized.Parameter
+	public CheckpointedStateScope scope;
+
 	@Rule
 	public final TemporaryFolder tempDir = new TemporaryFolder();
 
@@ -71,14 +85,14 @@ public class FsCheckpointStateOutputStreamTest {
 	public void testWrongParameters() throws Exception {
 		// this should fail
 		new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-			Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 4000, 5000);
+			Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 4000, 5000, scope);
 	}
 
 	@Test
 	public void testEmptyState() throws Exception {
 		FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
 				new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-						Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512);
+						Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, scope);
 
 		StreamStateHandle handle = stream.closeAndGetHandle();
 		assertTrue(handle == null);
@@ -108,7 +122,7 @@ public class FsCheckpointStateOutputStreamTest {
 	public void testGetPos() throws Exception {
 		FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
 				new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-						Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17);
+						Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, scope);
 
 		for (int i = 0; i < 64; ++i) {
 			Assert.assertEquals(i, stream.getPos());
@@ -120,7 +134,7 @@ public class FsCheckpointStateOutputStreamTest {
 		// ----------------------------------------------------
 
 		stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-				Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17);
+				Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, scope);
 
 		byte[] data = "testme!".getBytes(ConfigConstants.DEFAULT_CHARSET);
 
@@ -150,7 +164,8 @@ public class FsCheckpointStateOutputStreamTest {
 			Path.fromLocalFile(tempDir.newFolder()),
 			fs,
 			4,
-			0);
+			0,
+			scope);
 
 		// this should create the underlying file stream
 		stream.write(new byte[] {1, 2, 3, 4, 5});
@@ -179,7 +194,8 @@ public class FsCheckpointStateOutputStreamTest {
 			Path.fromLocalFile(tempDir.newFolder()),
 			fs,
 			4,
-			0);
+			0,
+			scope);
 
 		// this should create the underlying file stream
 		stream.write(new byte[] {1, 2, 3, 4, 5});
@@ -199,7 +215,7 @@ public class FsCheckpointStateOutputStreamTest {
 	private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
 		FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
 			new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-					Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), bufferSize, threshold);
+					Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), bufferSize, threshold, scope);
 
 		Random rnd = new Random();
 		byte[] original = new byte[numBytes];
@@ -248,7 +264,7 @@ public class FsCheckpointStateOutputStreamTest {
 	@Test
 	public void testWriteFailsFastWhenClosed() throws Exception {
 		FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
-				Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512);
+				Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, scope);
 
 		assertFalse(stream.isClosed());
 
@@ -287,7 +303,7 @@ public class FsCheckpointStateOutputStreamTest {
 		final Path basePath = Path.fromLocalFile(directory);
 
 		final Supplier<CheckpointStateOutputStream> factory = () ->
-				new FsCheckpointStateOutputStream(basePath, FileSystem.getLocalFileSystem(), 1024, 15);
+				new FsCheckpointStateOutputStream(basePath, FileSystem.getLocalFileSystem(), 1024, 15, scope);
 
 		CheckpointStateOutputStream stream1 = factory.get();
 		CheckpointStateOutputStream stream2 = factory.get();
@@ -356,10 +372,10 @@ public class FsCheckpointStateOutputStreamTest {
 		FileSystem fs = spy(FileSystem.getLocalFileSystem());
 
 		FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
-				Path.fromLocalFile(directory), fs, 1024, 1);
+				Path.fromLocalFile(directory), fs, 1024, 1, scope);
 
 		FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
-				Path.fromLocalFile(directory), fs, 1024, 1);
+				Path.fromLocalFile(directory), fs, 1024, 1, scope);
 
 		stream1.write(new byte[61]);
 		stream2.write(new byte[61]);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
index 973383c..1f32901 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
@@ -178,16 +178,16 @@ public class StreamTaskStateInitializerImplTest {
 					new OperatorStateHandle.StateMetaInfo(
 						new long[]{0, 10},
 						OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)),
-				CheckpointTestUtils.createDummyStreamStateHandle(random)),
+				CheckpointTestUtils.createDummyStreamStateHandle(random, null)),
 			new OperatorStreamStateHandle(
 				Collections.singletonMap(
 					"_default_",
 					new OperatorStateHandle.StateMetaInfo(
 						new long[]{0, 20, 30},
 						OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)),
-				CheckpointTestUtils.createDummyStreamStateHandle(random)),
-			CheckpointTestUtils.createDummyKeyGroupStateHandle(random),
-			CheckpointTestUtils.createDummyKeyGroupStateHandle(random),
+				CheckpointTestUtils.createDummyStreamStateHandle(random, null)),
+			CheckpointTestUtils.createDummyKeyGroupStateHandle(random, null),
+			CheckpointTestUtils.createDummyKeyGroupStateHandle(random, null),
 			singleton(createNewInputChannelStateHandle(10, random)),
 			singleton(createNewResultSubpartitionStateHandle(10, random)));
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index dd8fa08..2b812ea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -127,7 +127,7 @@ public class OperatorSnapshotUtil {
 			final int v = dis.readInt();
 
 			// still required for compatibility to consume the bytes.
-			MetadataV3Serializer.deserializeStreamStateHandle(dis);
+			MetadataV3Serializer.deserializeStreamStateHandle(dis, null);
 
 			List<OperatorStateHandle> rawOperatorState = null;
 			int numRawOperatorStates = dis.readInt();
@@ -135,7 +135,7 @@ public class OperatorSnapshotUtil {
 				rawOperatorState = new ArrayList<>();
 				for (int i = 0; i < numRawOperatorStates; i++) {
 					OperatorStateHandle operatorState = MetadataV3Serializer.deserializeOperatorStateHandle(
-						dis);
+						dis, null);
 					rawOperatorState.add(operatorState);
 				}
 			}
@@ -146,7 +146,7 @@ public class OperatorSnapshotUtil {
 				managedOperatorState = new ArrayList<>();
 				for (int i = 0; i < numManagedOperatorStates; i++) {
 					OperatorStateHandle operatorState = MetadataV3Serializer.deserializeOperatorStateHandle(
-						dis);
+						dis, null);
 					managedOperatorState.add(operatorState);
 				}
 			}
@@ -157,7 +157,7 @@ public class OperatorSnapshotUtil {
 				rawKeyedState = new ArrayList<>();
 				for (int i = 0; i < numRawKeyedStates; i++) {
 					KeyedStateHandle keyedState = MetadataV3Serializer.deserializeKeyedStateHandle(
-						dis);
+						dis, null);
 					rawKeyedState.add(keyedState);
 				}
 			}
@@ -168,7 +168,7 @@ public class OperatorSnapshotUtil {
 				managedKeyedState = new ArrayList<>();
 				for (int i = 0; i < numManagedKeyedStates; i++) {
 					KeyedStateHandle keyedState = MetadataV3Serializer.deserializeKeyedStateHandle(
-						dis);
+						dis, null);
 					managedKeyedState.add(keyedState);
 				}
 			}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 7393442..be9b706 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -156,6 +156,26 @@ public class SavepointITCase extends TestLogger {
 	}
 
 	@Test
+	public void testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath() throws Exception {
+		final int numTaskManagers = 2;
+		final int numSlotsPerTaskManager = 2;
+		final int parallelism = numTaskManagers * numSlotsPerTaskManager;
+
+		final MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(
+			numTaskManagers,
+			numSlotsPerTaskManager,
+			getFileBasedCheckpointsConfig());
+
+		final String savepointPath = submitJobAndTakeSavepoint(clusterFactory, parallelism);
+		final org.apache.flink.core.fs.Path oldPath = new org.apache.flink.core.fs.Path(savepointPath);
+		final org.apache.flink.core.fs.Path newPath = new org.apache.flink.core.fs.Path(folder.newFolder().toURI().toString());
+		(new org.apache.flink.core.fs.Path(savepointPath).getFileSystem()).rename(oldPath, newPath);
+		verifySavepoint(parallelism, newPath.toUri().toString());
+
+		restoreJobAndVerifyState(newPath.toUri().toString(), clusterFactory, parallelism);
+	}
+
+	@Test
 	public void testShouldAddEntropyToSavepointPath() throws Exception {
 		final int numTaskManagers = 2;
 		final int numSlotsPerTaskManager = 2;
@@ -219,9 +239,12 @@ public class SavepointITCase extends TestLogger {
 		}
 	}
 
-	private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
+	private void restoreJobAndVerifyState(
+		String savepointPath,
+		MiniClusterResourceFactory clusterFactory,
+		int parallelism) throws Exception {
 		final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
-		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false));
 		final JobID jobId = jobGraph.getJobID();
 		StatefulCounter.resetForTest(parallelism);
 


[flink] 14/14: [FLINK-17699][DataStream API] Initalize SourceOperator more eagerly and reduce scope or collaborators.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 939625f2c84bdce6872548d3df672f492e33a704
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 10 23:07:16 2020 +0200

    [FLINK-17699][DataStream API] Initalize SourceOperator more eagerly and reduce scope or collaborators.
    
    This reduces the scope of necessary mocking in the tests and of special-casing in the setup logic.
    
      - This removes the dependency on Source and replaces it with a reader factory
      - This let's the SourceOperator register itself at the OperatorEventDispatcher
---
 .../streaming/api/operators/SourceOperator.java    | 45 ++++++++++------
 .../api/operators/SourceOperatorFactory.java       | 62 ++++++++++++++++------
 .../api/operators/SourceOperatorTest.java          | 39 +++++++-------
 3 files changed, 94 insertions(+), 52 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 3af714d..e4971d2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
@@ -45,6 +44,9 @@ import org.apache.flink.util.CollectionUtil;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Base source operator only used for integrating the source reader which is proposed by FLIP-27. It implements
@@ -67,24 +69,38 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 	static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC =
 			new ListStateDescriptor<>("SourceReaderState", BytePrimitiveArraySerializer.INSTANCE);
 
-	private final Source<OUT, SplitT, ?> source;
+	/** The factory for the source reader. This is a workaround, because currently the SourceReader
+	 * must be lazily initialized, which is mainly because the metrics groups that the reader relies on is
+	 * lazily initialized. */
+	private final Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory;
 
+	/** The serializer for the splits, applied to the split types before storing them in the reader state. */
 	private final SimpleVersionedSerializer<SplitT> splitSerializer;
 
-	// Fields that will be setup at runtime.
-	private transient SourceReader<OUT, SplitT> sourceReader;
-	private transient ListState<SplitT> readerState;
-	private transient OperatorEventGateway operatorEventGateway;
+	/** The event gateway through which this operator talks to its coordinator. */
+	private final OperatorEventGateway operatorEventGateway;
+
+	// ---- lazily initialized fields ----
+
+	/** The source reader that does most of the work. */
+	private SourceReader<OUT, SplitT> sourceReader;
+
+	/** The state that holds the currently assigned splits. */
+	private ListState<SplitT> readerState;
 
-	public SourceOperator(Source<OUT, SplitT, ?> source) {
-		this.source = source;
-		this.splitSerializer = source.getSplitSerializer();
+	public SourceOperator(
+			Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory,
+			OperatorEventGateway operatorEventGateway,
+			SimpleVersionedSerializer<SplitT> splitSerializer) {
+
+		this.readerFactory = checkNotNull(readerFactory);
+		this.operatorEventGateway = checkNotNull(operatorEventGateway);
+		this.splitSerializer = checkNotNull(splitSerializer);
 	}
 
 	@Override
 	public void open() throws Exception {
-		// Create the source reader.
-		SourceReaderContext context = new SourceReaderContext() {
+		final SourceReaderContext context = new SourceReaderContext() {
 			@Override
 			public MetricGroup metricGroup() {
 				return getRuntimeContext().getMetricGroup();
@@ -95,7 +111,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 				operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
 			}
 		};
-		sourceReader = source.createReader(context);
+
+		sourceReader = readerFactory.apply(context);
 
 		// restore the state if necessary.
 		final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());
@@ -142,10 +159,6 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 		readerState = new SimpleVersionedListState<>(rawState, splitSerializer);
 	}
 
-	public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
-		this.operatorEventGateway = operatorEventGateway;
-	}
-
 	@SuppressWarnings("unchecked")
 	public void handleOperatorEvent(OperatorEvent event) {
 		if (event instanceof AddSplitEvent) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 79f6453..c30a0a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -19,12 +19,17 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
 
+import java.util.function.Function;
+
 /**
  * The Factory class for {@link SourceOperator}.
  */
@@ -39,9 +44,6 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 	/** The number of worker thread for the source coordinator. */
 	private final int numCoordinatorWorkerThread;
 
-	/** The {@link OperatorEventDispatcher} to register the SourceOperator. */
-	private OperatorEventDispatcher operatorEventDispatcher;
-
 	public SourceOperatorFactory(Source<OUT, ?, ?> source) {
 		this(source, 1);
 	}
@@ -52,15 +54,23 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters) {
-		SourceOperator<OUT, ?> sourceOperator = new SourceOperator<>(source);
-		OperatorEventGateway operatorEventGateway = operatorEventDispatcher.registerEventHandler(
-				parameters.getStreamConfig().getOperatorID(),
-				sourceOperator);
-		sourceOperator.setOperatorEventGateway(operatorEventGateway);
+		final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+		final OperatorEventGateway gateway = parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
+
+		final SourceOperator<OUT, ?> sourceOperator = instantiateSourceOperator(
+				source::createReader,
+				gateway,
+				source.getSplitSerializer());
+
 		sourceOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
-		return (T) sourceOperator;
+		parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator);
+
+		// today's lunch is generics spaghetti
+		@SuppressWarnings("unchecked")
+		final T castedOperator = (T) sourceOperator;
+
+		return castedOperator;
 	}
 
 	@Override
@@ -68,11 +78,7 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 		return new SourceCoordinatorProvider<>(operatorName, operatorID, source, numCoordinatorWorkerThread);
 	}
 
-	@Override
-	public void setOperatorEventDispatcher(OperatorEventDispatcher operatorEventDispatcher) {
-		this.operatorEventDispatcher = operatorEventDispatcher;
-	}
-
+	@SuppressWarnings("rawtypes")
 	@Override
 	public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
 		return SourceOperator.class;
@@ -82,4 +88,28 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 	public boolean isStreamSource() {
 		return true;
 	}
+
+	/**
+	 * This is a utility method to conjure up a "SplitT" generics variable binding so that we can
+	 * construct the SourceOperator without resorting to "all raw types".
+	 * That way, this methods puts all "type non-safety" in one place and allows to maintain as much
+	 * generics safety in the main code as possible.
+	 */
+	@SuppressWarnings("unchecked")
+	private static <T, SplitT extends SourceSplit> SourceOperator<T, SplitT> instantiateSourceOperator(
+			Function<SourceReaderContext, SourceReader<T, ?>> readerFactory,
+			OperatorEventGateway eventGateway,
+			SimpleVersionedSerializer<?> splitSerializer) {
+
+		// jumping through generics hoops: cast the generics away to then cast them back more strictly typed
+		final Function<SourceReaderContext, SourceReader<T, SplitT>> typedReaderFactory =
+				(Function<SourceReaderContext, SourceReader<T, SplitT>>) (Function<?, ?>) readerFactory;
+
+		final SimpleVersionedSerializer<SplitT> typedSplitSerializer = (SimpleVersionedSerializer<SplitT>) splitSerializer;
+
+		return new SourceOperator<>(
+				typedReaderFactory,
+				eventGateway,
+				typedSplitSerializer);
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 653af8b..34b7962 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -19,11 +19,8 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
@@ -31,6 +28,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
@@ -61,20 +59,18 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings("serial")
 public class SourceOperatorTest {
 
-	private static final int NUM_SPLITS = 5;
 	private static final int SUBTASK_INDEX = 1;
 	private static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 10);
 
-	private MockSource source;
+	private MockSourceReader mockSourceReader;
 	private MockOperatorEventGateway mockGateway;
 	private SourceOperator<Integer, MockSourceSplit> operator;
 
 	@Before
 	public void setup() {
-		this.source = new MockSource(Boundedness.BOUNDED, NUM_SPLITS);
-		this.operator = new TestingSourceOperator<>(source, SUBTASK_INDEX);
+		this.mockSourceReader = new MockSourceReader();
 		this.mockGateway = new MockOperatorEventGateway();
-		this.operator.setOperatorEventGateway(mockGateway);
+		this.operator = new TestingSourceOperator<>(mockSourceReader, mockGateway, SUBTASK_INDEX);
 	}
 
 	@Test
@@ -91,9 +87,7 @@ public class SourceOperatorTest {
 		operator.initializeState(getStateContext());
 		// Open the operator.
 		operator.open();
-		// A source reader should have been created.
-		assertEquals(1, source.getCreatedReaders().size());
-		MockSourceReader mockSourceReader = source.getCreatedReaders().get(0);
+
 		// The source reader should have been assigned a split.
 		assertEquals(Collections.singletonList(MOCK_SPLIT), mockSourceReader.getAssignedSplits());
 		// The source reader should have started.
@@ -112,8 +106,7 @@ public class SourceOperatorTest {
 		operator.open();
 		MockSourceSplit newSplit = new MockSourceSplit((2));
 		operator.handleOperatorEvent(new AddSplitEvent<>(Collections.singletonList(newSplit)));
-		// The source reader should bave been assigned two splits.
-		MockSourceReader mockSourceReader = source.getCreatedReaders().get(0);
+		// The source reader should have been assigned two splits.
 		assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), mockSourceReader.getAssignedSplits());
 	}
 
@@ -123,8 +116,7 @@ public class SourceOperatorTest {
 		operator.open();
 		SourceEvent event = new SourceEvent() {};
 		operator.handleOperatorEvent(new SourceEventWrapper(event));
-		// The source reader should bave been assigned two splits.
-		MockSourceReader mockSourceReader = source.getCreatedReaders().get(0);
+		// The source reader should have been assigned two splits.
 		assertEquals(Collections.singletonList(event), mockSourceReader.getReceivedSourceEvents());
 	}
 
@@ -179,13 +171,20 @@ public class SourceOperatorTest {
 	/**
 	 * A testing class that overrides the getRuntimeContext() Method.
 	 */
-	private static class TestingSourceOperator<OUT, SplitT extends SourceSplit>
-			extends SourceOperator<OUT, SplitT> {
+	private static class TestingSourceOperator<OUT> extends SourceOperator<OUT, MockSourceSplit> {
 
 		private final int subtaskIndex;
 
-		TestingSourceOperator(Source<OUT, SplitT, ?> source, int subtaskIndex) {
-			super(source);
+		TestingSourceOperator(
+				SourceReader<OUT, MockSourceSplit> reader,
+				OperatorEventGateway eventGateway,
+				int subtaskIndex) {
+
+			super(
+					(context) -> reader,
+					eventGateway,
+					new MockSourceSplitSerializer());
+
 			this.subtaskIndex = subtaskIndex;
 		}
 


[flink] 13/14: [FLINK-17696][streaming runtime] Add CoordinatorEventDispatcher to StreamOperatorParameters

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1c9ed3d846435ef36532e9f193327b30c17b5e11
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 10 22:43:00 2020 +0200

    [FLINK-17696][streaming runtime] Add CoordinatorEventDispatcher to StreamOperatorParameters
    
    This supports more eager initialization of operators that depends on the CoordinatorEventDispatcher.
---
 .../coordination/OperatorEventDispatcher.java          | 16 +++++++++++++---
 .../api/operators/CoordinatedOperatorFactory.java      | 12 ------------
 .../api/operators/StreamOperatorFactoryUtil.java       | 11 ++---------
 .../api/operators/StreamOperatorParameters.java        | 10 +++++++++-
 .../operators/collect/CollectSinkOperatorFactory.java  | 18 ++++++------------
 .../runtime/tasks/OperatorEventDispatcherImpl.java     | 14 +++++++-------
 6 files changed, 37 insertions(+), 44 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java
index db9ff94..b9d2648 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java
@@ -21,10 +21,20 @@ package org.apache.flink.runtime.operators.coordination;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 
 /**
- * The dispatcher through which Operators receive operator events and through which they can send operator
- * events back to the coordinator.
+ * The dispatcher through which Operators receive {@link OperatorEvent}s and through which they can
+ * send OperatorEvents back to the {@code OperatorCoordinator}.
  */
 public interface OperatorEventDispatcher {
 
-	OperatorEventGateway registerEventHandler(OperatorID operator, OperatorEventHandler handler);
+	/**
+	 * Register a listener that is notified every time an OperatorEvent is sent from the
+	 * OperatorCoordinator (of the operator with the given OperatorID) to this subtask.
+	 */
+	void registerEventHandler(OperatorID operator, OperatorEventHandler handler);
+
+	/**
+	 * Gets the gateway through which events can be passed to the OperatorCoordinator for
+	 * the operator identified by the given OperatorID.
+	 */
+	OperatorEventGateway getOperatorEventGateway(OperatorID operatorId);
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CoordinatedOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CoordinatedOperatorFactory.java
index dab5bf3..21bda71 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CoordinatedOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CoordinatedOperatorFactory.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
 
 /**
  * A factory class for the {@link StreamOperator}s implementing
@@ -43,15 +42,4 @@ public interface CoordinatedOperatorFactory<OUT> extends StreamOperatorFactory<O
 	 * @return the provider of the {@link OperatorCoordinator} for this operator.
 	 */
 	OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID);
-
-	/**
-	 * Sets the {@link OperatorEventDispatcher} for registering the
-	 * {@link org.apache.flink.runtime.operators.coordination.OperatorEventHandler OperaterEventHandler} and setup the
-	 * {@link org.apache.flink.runtime.operators.coordination.OperatorEventGateway OperatorEventGateway} for the
-	 * SourceOperator to send events to the operator coordinator. This method will be invoked before
-	 * {@link #createStreamOperator(StreamOperatorParameters)} is invoked.
-	 *
-	 * @param operatorEventDispatcher the {@link OperatorEventDispatcher} to register the
-	 */
-	void setOperatorEventDispatcher(OperatorEventDispatcher operatorEventDispatcher);
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
index 91b09ac..fb0917f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
@@ -24,7 +24,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.Preconditions;
 
 import java.util.Optional;
 
@@ -61,20 +60,14 @@ public class StreamOperatorFactoryUtil {
 			((ProcessingTimeServiceAware) operatorFactory).setProcessingTimeService(processingTimeService);
 		}
 
-		if (operatorFactory instanceof CoordinatedOperatorFactory) {
-			Preconditions.checkNotNull(
-					operatorEventDispatcher,
-					"The OperatorEventDispatcher should not be null.");
-			((CoordinatedOperatorFactory<OUT>) operatorFactory).setOperatorEventDispatcher(operatorEventDispatcher);
-		}
-
 		// TODO: what to do with ProcessingTimeServiceAware?
 		OP op = operatorFactory.createStreamOperator(
 			new StreamOperatorParameters<>(
 				containingTask,
 				configuration,
 				output,
-				processingTimeService));
+				processingTimeService,
+				operatorEventDispatcher));
 		return new Tuple2<>(op, Optional.ofNullable(processingTimeService));
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
index 5aaffcc..70df0e4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -37,16 +38,19 @@ public class StreamOperatorParameters<OUT> {
 	private final StreamConfig config;
 	private final Output<StreamRecord<OUT>> output;
 	private final ProcessingTimeService processingTimeService;
+	private final OperatorEventDispatcher operatorEventDispatcher;
 
 	public StreamOperatorParameters(
 			StreamTask<?, ?> containingTask,
 			StreamConfig config,
 			Output<StreamRecord<OUT>> output,
-			ProcessingTimeService processingTimeService) {
+			ProcessingTimeService processingTimeService,
+			OperatorEventDispatcher operatorEventDispatcher) {
 		this.containingTask = containingTask;
 		this.config = config;
 		this.output = output;
 		this.processingTimeService = processingTimeService;
+		this.operatorEventDispatcher = operatorEventDispatcher;
 	}
 
 	public StreamTask<?, ?> getContainingTask() {
@@ -64,4 +68,8 @@ public class StreamOperatorParameters<OUT> {
 	public ProcessingTimeService getProcessingTimeService() {
 		return processingTimeService;
 	}
+
+	public OperatorEventDispatcher getOperatorEventDispatcher() {
+		return operatorEventDispatcher;
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
index 7e9da6e..8fa9843 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators.collect;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
-import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
 import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -36,8 +35,6 @@ public class CollectSinkOperatorFactory extends SimpleUdfStreamOperatorFactory<O
 
 	private final CollectSinkOperator<?> operator;
 
-	private OperatorEventDispatcher operatorEventDispatcher;
-
 	public CollectSinkOperatorFactory(CollectSinkOperator<?> operator) {
 		super(operator);
 		this.operator = operator;
@@ -45,11 +42,13 @@ public class CollectSinkOperatorFactory extends SimpleUdfStreamOperatorFactory<O
 
 	@Override
 	public <T extends StreamOperator<Object>> T  createStreamOperator(StreamOperatorParameters<Object> parameters) {
-		OperatorEventGateway operatorEventGateway = operatorEventDispatcher.registerEventHandler(
-			parameters.getStreamConfig().getOperatorID(),
-			operator);
-		operator.setOperatorEventGateway(operatorEventGateway);
+		final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+		final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
+
+		operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorId));
 		operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
+		eventDispatcher.registerEventHandler(operatorId, operator);
+
 		return (T) operator;
 	}
 
@@ -58,9 +57,4 @@ public class CollectSinkOperatorFactory extends SimpleUdfStreamOperatorFactory<O
 		operator.getOperatorIdFuture().complete(operatorID);
 		return new CollectSinkOperatorCoordinator.Provider(operatorID);
 	}
-
-	@Override
-	public void setOperatorEventDispatcher(OperatorEventDispatcher operatorEventDispatcher) {
-		this.operatorEventDispatcher = operatorEventDispatcher;
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java
index 3407d5b..e6b6ea1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java
@@ -74,18 +74,18 @@ final class OperatorEventDispatcherImpl implements OperatorEventDispatcher {
 	}
 
 	@Override
-	public OperatorEventGateway registerEventHandler(OperatorID operator, OperatorEventHandler handler) {
-		final OperatorEventGateway gateway = new OperatorEventGatewayImpl(toCoordinator, operator);
+	public void registerEventHandler(OperatorID operator, OperatorEventHandler handler) {
 		final OperatorEventHandler prior = handlers.putIfAbsent(operator, handler);
-
-		if (prior == null) {
-			return gateway;
-		}
-		else {
+		if (prior != null) {
 			throw new IllegalStateException("already a handler registered for this operatorId");
 		}
 	}
 
+	@Override
+	public OperatorEventGateway getOperatorEventGateway(OperatorID operatorId) {
+		return new OperatorEventGatewayImpl(toCoordinator, operatorId);
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class OperatorEventGatewayImpl implements OperatorEventGateway {


[flink] 09/14: [FLINK-10740][DataStream API] Add a utility SimpleVersionedListState

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1cceb3ee3f109aefaeec440a87651053d0f4c191
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 10 20:04:42 2020 +0200

    [FLINK-10740][DataStream API] Add a utility SimpleVersionedListState
---
 .../operators/util/SimpleVersionedListState.java   | 142 +++++++++++++++++++++
 1 file changed, 142 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/util/SimpleVersionedListState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/util/SimpleVersionedListState.java
new file mode 100644
index 0000000..f52fc42
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/util/SimpleVersionedListState.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.util;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ListState} that uses a {@link SimpleVersionedSerializer} instead of a {@link TypeSerializer}.
+ *
+ * <p>The state wraps a {@link ListState} of type {@code byte[]}, meaning it internally keeps only bytes
+ * and lazily deserializes them into objects. This has two major implications, compared to a {@code ListState}
+ * states that uses a {@code TypeSerializer}:
+ *
+ * <ul>
+ *     <li>This state does not participate in <i>>state migration</i>. The bytes are never converted and
+ *         different state versions are lazily resolved by the versioned serializer.
+ *     <li>This state is generally slower than states that directly use the {@code TypeSerializer}, because
+ *         of extra copies into byte arrays and extra version encodings.
+ * </ul>
+ *
+ * @param <T> The type of the objects stored in the state.
+ */
+public class SimpleVersionedListState<T> implements ListState<T> {
+
+	private final ListState<byte[]> rawState;
+
+	private final SimpleVersionedSerializer<T> serializer;
+
+	/**
+	 * Creates a new SimpleVersionedListState that reads and writes bytes from the given raw ListState
+	 * with the given serializer.
+	 */
+	public SimpleVersionedListState(ListState<byte[]> rawState, SimpleVersionedSerializer<T> serializer) {
+		this.rawState = checkNotNull(rawState);
+		this.serializer = checkNotNull(serializer);
+	}
+
+	@Override
+	public void update(@Nullable List<T> values) throws Exception {
+		rawState.update(serializeAll(values));
+	}
+
+	@Override
+	public void addAll(@Nullable List<T> values) throws Exception {
+		rawState.addAll(serializeAll(values));
+	}
+
+	@Override
+	public Iterable<T> get() throws Exception {
+		final Iterable<byte[]> rawIterable = rawState.get();
+		final SimpleVersionedSerializer<T> serializer = this.serializer;
+
+		return () -> new DeserializingIterator<>(rawIterable.iterator(), serializer);
+	}
+
+	@Override
+	public void add(T value) throws Exception {
+		rawState.add(serialize(value));
+	}
+
+	@Override
+	public void clear() {
+		rawState.clear();
+	}
+
+	// ------------------------------------------------------------------------
+	//  utils
+	// ------------------------------------------------------------------------
+
+	private byte[] serialize(T value) throws IOException {
+		return SimpleVersionedSerialization.writeVersionAndSerialize(serializer, value);
+	}
+
+	@Nullable
+	private List<byte[]> serializeAll(@Nullable List<T> values) throws IOException {
+		if (values == null) {
+			return null;
+		}
+
+		final ArrayList<byte[]> rawValues = new ArrayList<>(values.size());
+		for (T value : values) {
+			rawValues.add(serialize(value));
+		}
+		return rawValues;
+	}
+
+	private static final class DeserializingIterator<T> implements Iterator<T> {
+
+		private final Iterator<byte[]> rawIterator;
+		private final SimpleVersionedSerializer<T> serializer;
+
+		private DeserializingIterator(Iterator<byte[]> rawIterator, SimpleVersionedSerializer<T> serializer) {
+			this.rawIterator = rawIterator;
+			this.serializer = serializer;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return rawIterator.hasNext();
+		}
+
+		@Override
+		public T next() {
+			final byte[] bytes = rawIterator.next();
+			try {
+				return SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+			}
+			catch (IOException e) {
+				throw new FlinkRuntimeException("Failed to deserialize value", e);
+			}
+		}
+	}
+}