You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 15:25:10 UTC

[flink] 06/14: [FLINK-5763][state backends] (follow-up) Rework MetaData Serializers and externalPointer passing

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d6bc0d5b845f7ade465e69c30e07be2947bb4b5e
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sat May 16 01:16:50 2020 +0200

    [FLINK-5763][state backends] (follow-up) Rework MetaData Serializers and externalPointer passing
    
    This fix has several goals:
    
    (1) Change the pointer / path resolution parsing from a static variable to a parameter that is passed.
    The serializers are singleton instances and currently assume to be usable in a multi-threaded manner.
    The static variables prevent this, usng the context object parameter restores this behavior.
    
    (2) Lower level serialization methods should not expose themselves directly to the tests.
    This methods makes (almost) all lower level serialization methods instance methods and package-private.
    Static access methods are gathered in one place, as a workaround for tests that require accesd to
    those lower level methods (even through they should not).
    
    (3) With more unified access to the methods from tests, we can now make prevent that tests need to
    be aware of the the context object or external pointer parameter.
    
    (4) Minor cosmetic cleanups around method grouping.
---
 .../org/apache/flink/core/fs/EntropyInjector.java  |   2 +-
 .../metadata/ChannelStateHandleSerializer.java     |  22 +-
 .../checkpoint/metadata/MetadataV2Serializer.java  |  10 +-
 .../metadata/MetadataV2V3SerializerBase.java       | 266 +++++++++++++--------
 .../checkpoint/metadata/MetadataV3Serializer.java  |  76 +++++-
 .../filesystem/AbstractFsCheckpointStorage.java    |   2 +-
 .../flink/streaming/util/OperatorSnapshotUtil.java |  24 +-
 7 files changed, 267 insertions(+), 135 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
index 1f67b34..e4e2078 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
@@ -94,7 +94,7 @@ public class EntropyInjector {
 	}
 
 	@Nullable
-	public static EntropyInjectingFileSystem getEntropyFs(FileSystem fs) {
+	private static EntropyInjectingFileSystem getEntropyFs(FileSystem fs) {
 		if (fs instanceof EntropyInjectingFileSystem) {
 			return (EntropyInjectingFileSystem) fs;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/ChannelStateHandleSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/ChannelStateHandleSerializer.java
index f7b2c8f..30a4a30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/ChannelStateHandleSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/ChannelStateHandleSerializer.java
@@ -45,11 +45,15 @@ class ChannelStateHandleSerializer {
 		});
 	}
 
-	ResultSubpartitionStateHandle deserializeResultSubpartitionStateHandle(DataInputStream dis) throws IOException {
+	ResultSubpartitionStateHandle deserializeResultSubpartitionStateHandle(
+			DataInputStream dis,
+			MetadataV2V3SerializerBase.DeserializationContext context) throws IOException {
+
 		return deserializeChannelStateHandle(
 			is -> new ResultSubpartitionInfo(is.readInt(), is.readInt()),
 			(streamStateHandle, longs, info) -> new ResultSubpartitionStateHandle(info, streamStateHandle, longs),
-			dis);
+			dis,
+			context);
 	}
 
 	public void serialize(InputChannelStateHandle handle, DataOutputStream dos) throws IOException {
@@ -59,11 +63,15 @@ class ChannelStateHandleSerializer {
 		});
 	}
 
-	InputChannelStateHandle deserializeInputChannelStateHandle(DataInputStream dis) throws IOException {
+	InputChannelStateHandle deserializeInputChannelStateHandle(
+			DataInputStream dis,
+			MetadataV2V3SerializerBase.DeserializationContext context) throws IOException {
+
 		return deserializeChannelStateHandle(
 			is -> new InputChannelInfo(is.readInt(), is.readInt()),
 			(streamStateHandle, longs, inputChannelInfo) -> new InputChannelStateHandle(inputChannelInfo, streamStateHandle, longs),
-			dis);
+			dis,
+			context);
 	}
 
 	private static <I> void serializeChannelStateHandle(
@@ -81,13 +89,15 @@ class ChannelStateHandleSerializer {
 	private static <Info, Handle extends AbstractChannelStateHandle<Info>> Handle deserializeChannelStateHandle(
 			FunctionWithException<DataInputStream, Info, IOException> infoReader,
 			TriFunctionWithException<StreamStateHandle, List<Long>, Info, Handle, IOException> handleBuilder,
-			DataInputStream dis) throws IOException {
+			DataInputStream dis,
+			MetadataV2V3SerializerBase.DeserializationContext context) throws IOException {
+
 		final Info info = infoReader.apply(dis);
 		int offsetsSize = dis.readInt();
 		final List<Long> offsets = new ArrayList<>(offsetsSize);
 		for (int i = 0; i < offsetsSize; i++) {
 			offsets.add(dis.readLong());
 		}
-		return handleBuilder.apply(deserializeStreamStateHandle(dis, null), offsets, info);
+		return handleBuilder.apply(deserializeStreamStateHandle(dis, context), offsets, info);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
index e02505a..fdf4392 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 
+import javax.annotation.Nullable;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -89,7 +91,7 @@ public class MetadataV2Serializer extends MetadataV2V3SerializerBase implements
 	}
 
 	@Override
-	protected OperatorState deserializeOperatorState(DataInputStream dis, String externalPointer) throws IOException {
+	protected OperatorState deserializeOperatorState(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
 		final OperatorID jobVertexId = new OperatorID(dis.readLong(), dis.readLong());
 		final int parallelism = dis.readInt();
 		final int maxParallelism = dis.readInt();
@@ -106,7 +108,7 @@ public class MetadataV2Serializer extends MetadataV2V3SerializerBase implements
 
 		for (int j = 0; j < numSubTaskStates; j++) {
 			final int subtaskIndex = dis.readInt();
-			final OperatorSubtaskState subtaskState = deserializeSubtaskState(dis, externalPointer);
+			final OperatorSubtaskState subtaskState = deserializeSubtaskState(dis, context);
 			taskState.putState(subtaskIndex, subtaskState);
 		}
 
@@ -125,7 +127,7 @@ public class MetadataV2Serializer extends MetadataV2V3SerializerBase implements
 	}
 
 	@Override
-	protected OperatorSubtaskState deserializeSubtaskState(DataInputStream dis, String externalPointer) throws IOException {
+	protected OperatorSubtaskState deserializeSubtaskState(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
 		// read two unused fields for compatibility:
 		//   - "duration"
 		//   - number of legacy states
@@ -138,6 +140,6 @@ public class MetadataV2Serializer extends MetadataV2V3SerializerBase implements
 					"no longer supported.");
 		}
 
-		return super.deserializeSubtaskState(dis, externalPointer);
+		return super.deserializeSubtaskState(dis, context);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index 2ea18be..fe8343a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint.metadata;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
@@ -39,11 +38,10 @@ import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.function.BiConsumerWithException;
-import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.BiFunctionWithException;
 
 import javax.annotation.Nullable;
 
@@ -61,8 +59,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /**
  * Base (De)serializer for checkpoint metadata format version 2 and 3.
  *
@@ -95,9 +91,6 @@ public abstract class MetadataV2V3SerializerBase {
 	private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
 	private static final byte RELATIVE_STREAM_STATE_HANDLE = 6;
 
-	private static Path exclusiveCheckpointDir = null;
-	private static String previousExternalPointer = null;
-
 	// ------------------------------------------------------------------------
 	//  (De)serialization entry points
 	// ------------------------------------------------------------------------
@@ -122,7 +115,13 @@ public abstract class MetadataV2V3SerializerBase {
 		}
 	}
 
-	protected CheckpointMetadata deserializeMetadata(DataInputStream dis, String externalPointer) throws IOException {
+	protected CheckpointMetadata deserializeMetadata(
+			DataInputStream dis,
+			@Nullable String externalPointer) throws IOException {
+
+		final DeserializationContext context = externalPointer == null
+				? null : new DeserializationContext(externalPointer);
+
 		// first: checkpoint ID
 		final long checkpointId = dis.readLong();
 		if (checkpointId < 0) {
@@ -151,7 +150,7 @@ public abstract class MetadataV2V3SerializerBase {
 		final List<OperatorState> operatorStates = new ArrayList<>(numTaskStates);
 
 		for (int i = 0; i < numTaskStates; i++) {
-			operatorStates.add(deserializeOperatorState(dis, externalPointer));
+			operatorStates.add(deserializeOperatorState(dis, context));
 		}
 
 		return new CheckpointMetadata(checkpointId, operatorStates, masterStates);
@@ -220,15 +219,11 @@ public abstract class MetadataV2V3SerializerBase {
 
 	protected abstract void serializeOperatorState(OperatorState operatorState, DataOutputStream dos) throws IOException;
 
-	protected abstract OperatorState deserializeOperatorState(DataInputStream dis, String deserializeOperatorState) throws IOException;
-
-	// ------------------------------------------------------------------------
-	//  operator subtask state (de)serialization methods
-	// ------------------------------------------------------------------------
+	protected abstract OperatorState deserializeOperatorState(DataInputStream dis, @Nullable DeserializationContext context) throws IOException;
 
 	protected void serializeSubtaskState(OperatorSubtaskState subtaskState, DataOutputStream dos) throws IOException {
-		serializeSingleton(subtaskState.getManagedOperatorState(), dos, MetadataV2V3SerializerBase::serializeOperatorStateHandle);
-		serializeSingleton(subtaskState.getRawOperatorState(), dos, MetadataV2V3SerializerBase::serializeOperatorStateHandle);
+		serializeSingleton(subtaskState.getManagedOperatorState(), dos, this::serializeOperatorStateHandle);
+		serializeSingleton(subtaskState.getRawOperatorState(), dos, this::serializeOperatorStateHandle);
 		serializeKeyedStateCol(subtaskState.getManagedKeyedState(), dos);
 		serializeKeyedStateCol(subtaskState.getRawKeyedState(), dos);
 	}
@@ -237,32 +232,22 @@ public abstract class MetadataV2V3SerializerBase {
 		serializeKeyedStateHandle(extractSingleton(managedKeyedState), dos);
 	}
 
-	private <T extends StateObject> void serializeSingleton(
-			StateObjectCollection<T> stateObjectCollection,
-			DataOutputStream dos,
-			BiConsumerWithException<T, DataOutputStream, IOException> cons) throws IOException {
-		final T state = extractSingleton(stateObjectCollection);
-		if (state != null) {
-			dos.writeInt(1);
-			cons.accept(state, dos);
-		} else {
-			dos.writeInt(0);
-		}
-	}
+	protected OperatorSubtaskState deserializeSubtaskState(
+			DataInputStream dis,
+			@Nullable DeserializationContext context) throws IOException {
 
-	protected OperatorSubtaskState deserializeSubtaskState(DataInputStream dis, String externalPointer) throws IOException {
 		final boolean hasManagedOperatorState = dis.readInt() != 0;
-		final OperatorStateHandle managedOperatorState = hasManagedOperatorState ? deserializeOperatorStateHandle(dis, externalPointer) : null;
+		final OperatorStateHandle managedOperatorState = hasManagedOperatorState ? deserializeOperatorStateHandle(dis, context) : null;
 
 		final boolean hasRawOperatorState = dis.readInt() != 0;
-		final OperatorStateHandle rawOperatorState = hasRawOperatorState ? deserializeOperatorStateHandle(dis, externalPointer) : null;
+		final OperatorStateHandle rawOperatorState = hasRawOperatorState ? deserializeOperatorStateHandle(dis, context) : null;
 
-		final KeyedStateHandle managedKeyedState = deserializeKeyedStateHandle(dis, externalPointer);
-		final KeyedStateHandle rawKeyedState = deserializeKeyedStateHandle(dis, externalPointer);
+		final KeyedStateHandle managedKeyedState = deserializeKeyedStateHandle(dis, context);
+		final KeyedStateHandle rawKeyedState = deserializeKeyedStateHandle(dis, context);
 
-		StateObjectCollection<InputChannelStateHandle> inputChannelState = deserializeInputChannelStateHandle(dis);
+		StateObjectCollection<InputChannelStateHandle> inputChannelState = deserializeInputChannelStateHandle(dis, context);
 
-		StateObjectCollection<ResultSubpartitionStateHandle> resultSubpartitionState = deserializeResultSubpartitionStateHandle(dis);
+		StateObjectCollection<ResultSubpartitionStateHandle> resultSubpartitionState = deserializeResultSubpartitionStateHandle(dis, context);
 
 		return new OperatorSubtaskState(
 			managedOperatorState,
@@ -273,10 +258,11 @@ public abstract class MetadataV2V3SerializerBase {
 			resultSubpartitionState);
 	}
 
-	@VisibleForTesting
-	public static void serializeKeyedStateHandle(
-			KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
+	// ------------------------------------------------------------------------
+	//  keyed state
+	// ------------------------------------------------------------------------
 
+	void serializeKeyedStateHandle(KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
 		if (stateHandle == null) {
 			dos.writeByte(NULL_HANDLE);
 		} else if (stateHandle instanceof KeyGroupsStateHandle) {
@@ -309,35 +295,10 @@ public abstract class MetadataV2V3SerializerBase {
 		}
 	}
 
-	private static void serializeStreamStateHandleMap(
-			Map<StateHandleID, StreamStateHandle> map,
-			DataOutputStream dos) throws IOException {
-
-		dos.writeInt(map.size());
-		for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) {
-			dos.writeUTF(entry.getKey().toString());
-			serializeStreamStateHandle(entry.getValue(), dos);
-		}
-	}
-
-	private static Map<StateHandleID, StreamStateHandle> deserializeStreamStateHandleMap(
+	KeyedStateHandle deserializeKeyedStateHandle(
 			DataInputStream dis,
-			String externalPointer) throws IOException {
-
-		final int size = dis.readInt();
-		Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
+			@Nullable DeserializationContext context) throws IOException {
 
-		for (int i = 0; i < size; ++i) {
-			StateHandleID stateHandleID = new StateHandleID(dis.readUTF());
-			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, externalPointer);
-			result.put(stateHandleID, stateHandle);
-		}
-
-		return result;
-	}
-
-	@VisibleForTesting
-	public static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis, String externalPointer) throws IOException {
 		final int type = dis.readByte();
 		if (NULL_HANDLE == type) {
 
@@ -354,7 +315,7 @@ public abstract class MetadataV2V3SerializerBase {
 			}
 			KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(
 				keyGroupRange, offsets);
-			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, externalPointer);
+			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context);
 			return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
 		} else if (INCREMENTAL_KEY_GROUPS_HANDLE == type) {
 
@@ -365,9 +326,9 @@ public abstract class MetadataV2V3SerializerBase {
 			KeyGroupRange keyGroupRange =
 				KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
 
-			StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis, externalPointer);
-			Map<StateHandleID, StreamStateHandle> sharedStates = deserializeStreamStateHandleMap(dis, externalPointer);
-			Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis, externalPointer);
+			StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis, context);
+			Map<StateHandleID, StreamStateHandle> sharedStates = deserializeStreamStateHandleMap(dis, context);
+			Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis, context);
 
 			UUID uuid;
 
@@ -390,10 +351,7 @@ public abstract class MetadataV2V3SerializerBase {
 		}
 	}
 
-	@VisibleForTesting
-	public static void serializeOperatorStateHandle(
-		OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
-
+	void serializeOperatorStateHandle(OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
 		if (stateHandle != null) {
 			dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
 			Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap =
@@ -419,10 +377,9 @@ public abstract class MetadataV2V3SerializerBase {
 		}
 	}
 
-	@VisibleForTesting
-	public static OperatorStateHandle deserializeOperatorStateHandle(
+	OperatorStateHandle deserializeOperatorStateHandle(
 			DataInputStream dis,
-			String externalPointer) throws IOException {
+			@Nullable DeserializationContext context) throws IOException {
 
 		final int type = dis.readByte();
 		if (NULL_HANDLE == type) {
@@ -445,17 +402,44 @@ public abstract class MetadataV2V3SerializerBase {
 						new OperatorStateHandle.StateMetaInfo(offsets, mode);
 				offsetsMap.put(key, metaInfo);
 			}
-			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, externalPointer);
+			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context);
 			return new OperatorStreamStateHandle(offsetsMap, stateHandle);
 		} else {
 			throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
 		}
 	}
 
-	@VisibleForTesting
-	public static void serializeStreamStateHandle(
-			StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
+	// ------------------------------------------------------------------------
+	//  channel state (unaligned checkpoints)
+	// ------------------------------------------------------------------------
+
+	protected StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(
+			DataInputStream dis,
+			@Nullable DeserializationContext context) throws IOException {
+		return StateObjectCollection.empty();
+	}
 
+	protected StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(
+			DataInputStream dis,
+			@Nullable DeserializationContext context) throws IOException {
+		return StateObjectCollection.empty();
+	}
+
+	protected void serializeResultSubpartitionStateHandle(
+			ResultSubpartitionStateHandle resultSubpartitionStateHandle,
+			DataOutputStream dos) throws IOException {
+	}
+
+	protected void serializeInputChannelStateHandle(
+			InputChannelStateHandle inputChannelStateHandle,
+			DataOutputStream dos) throws IOException {
+	}
+
+	// ------------------------------------------------------------------------
+	//  low-level state handles
+	// ------------------------------------------------------------------------
+
+	static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
 		if (stateHandle == null) {
 			dos.writeByte(NULL_HANDLE);
 
@@ -484,7 +468,10 @@ public abstract class MetadataV2V3SerializerBase {
 		dos.flush();
 	}
 
-	public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis, @Nullable String externalPointer) throws IOException {
+	static StreamStateHandle deserializeStreamStateHandle(
+			DataInputStream dis,
+			@Nullable DeserializationContext context) throws IOException {
+
 		final int type = dis.read();
 		if (NULL_HANDLE == type) {
 			return null;
@@ -499,14 +486,12 @@ public abstract class MetadataV2V3SerializerBase {
 			dis.readFully(data);
 			return new ByteStreamStateHandle(handleName, data);
 		} else if (RELATIVE_STREAM_STATE_HANDLE == type) {
-			checkArgument(externalPointer != null, "external pointer should not be null when deserializing relative state handle.");
+			if (context == null) {
+				throw new IOException("Cannot deserialize a RelativeFileStateHandle without a context to make it relative to.");
+			}
 			String relativePath = dis.readUTF();
 			long size = dis.readLong();
-			if (exclusiveCheckpointDir == null || (!externalPointer.equals(previousExternalPointer))) {
-				exclusiveCheckpointDir = ((FsCompletedCheckpointStorageLocation) (AbstractFsCheckpointStorage.resolveCheckpointPointer(externalPointer))).getExclusiveCheckpointDir();
-				previousExternalPointer = externalPointer;
-			}
-			Path statePath = new Path(exclusiveCheckpointDir, relativePath);
+			Path statePath = new Path(context.getExclusiveDirPath(), relativePath);
 			return new RelativeFileStateHandle(statePath, relativePath, size);
 		} else {
 			throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
@@ -518,7 +503,7 @@ public abstract class MetadataV2V3SerializerBase {
 	// ------------------------------------------------------------------------
 
 	@Nullable
-	static <T> T extractSingleton(Collection<T> collection) {
+	private static <T> T extractSingleton(Collection<T> collection) {
 		if (collection == null || collection.isEmpty()) {
 			return null;
 		}
@@ -530,27 +515,104 @@ public abstract class MetadataV2V3SerializerBase {
 		}
 	}
 
-	protected StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(DataInputStream dis) throws IOException {
-		return StateObjectCollection.empty();
-	}
-
-	protected StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(DataInputStream dis) throws IOException {
-		return StateObjectCollection.empty();
-	}
-
-	protected void serializeResultSubpartitionStateHandle(ResultSubpartitionStateHandle resultSubpartitionStateHandle, DataOutputStream dos) throws IOException {
+	private static <T extends StateObject> void serializeSingleton(
+			StateObjectCollection<T> stateObjectCollection,
+			DataOutputStream dos,
+			BiConsumerWithException<T, DataOutputStream, IOException> cons) throws IOException {
+		final T state = extractSingleton(stateObjectCollection);
+		if (state != null) {
+			dos.writeInt(1);
+			cons.accept(state, dos);
+		} else {
+			dos.writeInt(0);
+		}
 	}
 
-	protected void serializeInputChannelStateHandle(InputChannelStateHandle inputChannelStateHandle, DataOutputStream dos) throws IOException {
-	}
+	static <T extends StateObject> StateObjectCollection<T> deserializeCollection(
+		DataInputStream dis,
+		DeserializationContext context,
+		BiFunctionWithException<DataInputStream, DeserializationContext, T, IOException> s) throws IOException {
 
-	static <T extends StateObject> StateObjectCollection<T> deserializeCollection(DataInputStream dis, FunctionWithException<DataInputStream, T, IOException> s) throws IOException {
 		int size = dis.readInt();
 		List<T> result = new ArrayList<>();
 		for (int i = 0; i < size; i++) {
-			result.add(s.apply(dis));
+			result.add(s.apply(dis, context));
 		}
 		return new StateObjectCollection<>(result);
 	}
 
+	private static void serializeStreamStateHandleMap(
+		Map<StateHandleID, StreamStateHandle> map,
+		DataOutputStream dos) throws IOException {
+
+		dos.writeInt(map.size());
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) {
+			dos.writeUTF(entry.getKey().toString());
+			serializeStreamStateHandle(entry.getValue(), dos);
+		}
+	}
+
+	private static Map<StateHandleID, StreamStateHandle> deserializeStreamStateHandleMap(
+		DataInputStream dis,
+		@Nullable DeserializationContext context) throws IOException {
+
+		final int size = dis.readInt();
+		Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
+
+		for (int i = 0; i < size; ++i) {
+			StateHandleID stateHandleID = new StateHandleID(dis.readUTF());
+			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context);
+			result.put(stateHandleID, stateHandle);
+		}
+
+		return result;
+	}
+
+	// ------------------------------------------------------------------------
+	//  internal helper classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A context that keeps information needed during serialization. This context is passed
+	 * along by the methods. In some sense, this replaces the member fields of the class, because
+	 * the serializer is supposed to be "singleton stateless", and because there are multiple instances
+	 * involved (metadata serializer, channel state serializer).
+	 *
+	 * <p>The alternative to passing this context along would be to change the serializers to
+	 * work as actual instances so that they can keep the state. We might still want to do that,
+	 * but at the time of implementing this, it seems the less invasive change to use this context,
+	 * and it also works with static methods and with different serializers instances that do not know
+	 * of each other.
+	 *
+	 * <p>This context is currently hardwired to the FileSystem-based State Backends.
+	 * At the moment, this works because those are the only ones producing relative file
+	 * paths handles, which are in turn the only ones needing this context.
+	 * In the future, we should refactor this, though, and make the DeserializationContext
+	 * a property of the used checkpoint storage. That makes
+	 */
+	protected static final class DeserializationContext {
+
+		private final String externalPointer;
+
+		private Path cachedExclusiveDirPath;
+
+		DeserializationContext(String externalPointer) {
+			this.externalPointer = externalPointer;
+		}
+
+		Path getExclusiveDirPath() throws IOException {
+			if (cachedExclusiveDirPath == null) {
+				cachedExclusiveDirPath = createExclusiveDirPath(externalPointer);
+			}
+			return cachedExclusiveDirPath;
+		}
+
+		private static Path createExclusiveDirPath(String externalPointer) throws IOException {
+			try {
+				return AbstractFsCheckpointStorage.resolveCheckpointPointer(externalPointer).getExclusiveCheckpointDir();
+			} catch (IOException e) {
+				throw new IOException("Could not parse external pointer as state base path", e);
+			}
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
index 3376ce7..6bf98c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
@@ -25,10 +25,15 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.function.BiConsumerWithException;
 
+import javax.annotation.Nullable;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -109,7 +114,7 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 	}
 
 	@Override
-	protected OperatorState deserializeOperatorState(DataInputStream dis, String externalPointer) throws IOException {
+	protected OperatorState deserializeOperatorState(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
 		final OperatorID jobVertexId = new OperatorID(dis.readLong(), dis.readLong());
 		final int parallelism = dis.readInt();
 		final int maxParallelism = dis.readInt();
@@ -117,14 +122,14 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 		final OperatorState operatorState = new OperatorState(jobVertexId, parallelism, maxParallelism);
 
 		// Coordinator state
-		operatorState.setCoordinatorState(deserializeStreamStateHandle(dis, externalPointer));
+		operatorState.setCoordinatorState(deserializeStreamStateHandle(dis, context));
 
 		// Sub task states
 		final int numSubTaskStates = dis.readInt();
 
 		for (int j = 0; j < numSubTaskStates; j++) {
 			final int subtaskIndex = dis.readInt();
-			final OperatorSubtaskState subtaskState = deserializeSubtaskState(dis, externalPointer);
+			final OperatorSubtaskState subtaskState = deserializeSubtaskState(dis, context);
 			operatorState.putState(subtaskIndex, subtaskState);
 		}
 
@@ -139,8 +144,10 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 
 	@VisibleForTesting
 	@Override
-	public StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(DataInputStream dis) throws IOException {
-		return deserializeCollection(dis, channelStateHandleSerializer::deserializeResultSubpartitionStateHandle);
+	public StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(
+			DataInputStream dis,
+			@Nullable DeserializationContext context) throws IOException {
+		return deserializeCollection(dis, context, channelStateHandleSerializer::deserializeResultSubpartitionStateHandle);
 	}
 
 	@VisibleForTesting
@@ -151,8 +158,10 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 
 	@VisibleForTesting
 	@Override
-	public StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(DataInputStream dis) throws IOException {
-		return deserializeCollection(dis, channelStateHandleSerializer::deserializeInputChannelStateHandle);
+	public StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(
+			DataInputStream dis,
+			@Nullable DeserializationContext context) throws IOException {
+		return deserializeCollection(dis, context, channelStateHandleSerializer::deserializeInputChannelStateHandle);
 	}
 
 	private <T extends StateObject> void serializeCollection(
@@ -169,4 +178,57 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  exposed static methods for test cases
+	//
+	//  NOTE: The fact that certain tests directly call these lower level
+	//        serialization methods is a problem, because that way the tests
+	//        bypass the versioning scheme. Especially tests that test for
+	//        cross-version compatibility need to version themselves if we
+	//        ever break the format of these low level state types.
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	public static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
+		MetadataV2V3SerializerBase.serializeStreamStateHandle(stateHandle, dos);
+	}
+
+	@VisibleForTesting
+	public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+		return MetadataV2V3SerializerBase.deserializeStreamStateHandle(dis, null);
+	}
+
+	@VisibleForTesting
+	public static void serializeOperatorStateHandleUtil(OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
+		INSTANCE.serializeOperatorStateHandle(stateHandle, dos);
+	}
+
+	@VisibleForTesting
+	public static OperatorStateHandle deserializeOperatorStateHandleUtil(DataInputStream dis) throws IOException {
+		return INSTANCE.deserializeOperatorStateHandle(dis, null);
+	}
+
+	@VisibleForTesting
+	public static void serializeKeyedStateHandleUtil(
+			KeyedStateHandle stateHandle,
+			DataOutputStream dos) throws IOException {
+		INSTANCE.serializeKeyedStateHandle(stateHandle, dos);
+	}
+
+	@VisibleForTesting
+	public static KeyedStateHandle deserializeKeyedStateHandleUtil(DataInputStream dis) throws IOException {
+		return INSTANCE.deserializeKeyedStateHandle(dis, null);
+	}
+
+	@VisibleForTesting
+	public static StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(
+			DataInputStream dis) throws IOException {
+		return INSTANCE.deserializeInputChannelStateHandle(dis, null);
+	}
+
+	@VisibleForTesting
+	public StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(
+			DataInputStream dis) throws IOException {
+		return INSTANCE.deserializeResultSubpartitionStateHandle(dis, null);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
index d7cf150..0bc0650 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
@@ -210,7 +210,7 @@ public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {
 	 *                     the pointer points to a location that does not seem to be a checkpoint/savepoint.
 	 */
 	@Internal
-	public static CompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
+	public static FsCompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
 		checkNotNull(checkpointPointer, "checkpointPointer");
 		checkArgument(!checkpointPointer.isEmpty(), "empty checkpoint pointer");
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 2b812ea..b7c1188 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -63,7 +63,7 @@ public class OperatorSnapshotUtil {
 			if (rawOperatorState != null) {
 				dos.writeInt(rawOperatorState.size());
 				for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
-					MetadataV3Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+					MetadataV3Serializer.serializeOperatorStateHandleUtil(operatorStateHandle, dos);
 				}
 			} else {
 				// this means no states, not even an empty list
@@ -74,7 +74,7 @@ public class OperatorSnapshotUtil {
 			if (managedOperatorState != null) {
 				dos.writeInt(managedOperatorState.size());
 				for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
-					MetadataV3Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+					MetadataV3Serializer.serializeOperatorStateHandleUtil(operatorStateHandle, dos);
 				}
 			} else {
 				// this means no states, not even an empty list
@@ -85,7 +85,7 @@ public class OperatorSnapshotUtil {
 			if (rawKeyedState != null) {
 				dos.writeInt(rawKeyedState.size());
 				for (KeyedStateHandle keyedStateHandle : rawKeyedState) {
-					MetadataV3Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+					MetadataV3Serializer.serializeKeyedStateHandleUtil(keyedStateHandle, dos);
 				}
 			} else {
 				// this means no operator states, not even an empty list
@@ -96,7 +96,7 @@ public class OperatorSnapshotUtil {
 			if (managedKeyedState != null) {
 				dos.writeInt(managedKeyedState.size());
 				for (KeyedStateHandle keyedStateHandle : managedKeyedState) {
-					MetadataV3Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+					MetadataV3Serializer.serializeKeyedStateHandleUtil(keyedStateHandle, dos);
 				}
 			} else {
 				// this means no operator states, not even an empty list
@@ -127,15 +127,14 @@ public class OperatorSnapshotUtil {
 			final int v = dis.readInt();
 
 			// still required for compatibility to consume the bytes.
-			MetadataV3Serializer.deserializeStreamStateHandle(dis, null);
+			MetadataV3Serializer.deserializeStreamStateHandle(dis);
 
 			List<OperatorStateHandle> rawOperatorState = null;
 			int numRawOperatorStates = dis.readInt();
 			if (numRawOperatorStates >= 0) {
 				rawOperatorState = new ArrayList<>();
 				for (int i = 0; i < numRawOperatorStates; i++) {
-					OperatorStateHandle operatorState = MetadataV3Serializer.deserializeOperatorStateHandle(
-						dis, null);
+					OperatorStateHandle operatorState = MetadataV3Serializer.deserializeOperatorStateHandleUtil(dis);
 					rawOperatorState.add(operatorState);
 				}
 			}
@@ -145,8 +144,7 @@ public class OperatorSnapshotUtil {
 			if (numManagedOperatorStates >= 0) {
 				managedOperatorState = new ArrayList<>();
 				for (int i = 0; i < numManagedOperatorStates; i++) {
-					OperatorStateHandle operatorState = MetadataV3Serializer.deserializeOperatorStateHandle(
-						dis, null);
+					OperatorStateHandle operatorState = MetadataV3Serializer.deserializeOperatorStateHandleUtil(dis);
 					managedOperatorState.add(operatorState);
 				}
 			}
@@ -156,8 +154,7 @@ public class OperatorSnapshotUtil {
 			if (numRawKeyedStates >= 0) {
 				rawKeyedState = new ArrayList<>();
 				for (int i = 0; i < numRawKeyedStates; i++) {
-					KeyedStateHandle keyedState = MetadataV3Serializer.deserializeKeyedStateHandle(
-						dis, null);
+					KeyedStateHandle keyedState = MetadataV3Serializer.deserializeKeyedStateHandleUtil(dis);
 					rawKeyedState.add(keyedState);
 				}
 			}
@@ -167,15 +164,14 @@ public class OperatorSnapshotUtil {
 			if (numManagedKeyedStates >= 0) {
 				managedKeyedState = new ArrayList<>();
 				for (int i = 0; i < numManagedKeyedStates; i++) {
-					KeyedStateHandle keyedState = MetadataV3Serializer.deserializeKeyedStateHandle(
-						dis, null);
+					KeyedStateHandle keyedState = MetadataV3Serializer.deserializeKeyedStateHandleUtil(dis);
 					managedKeyedState.add(keyedState);
 				}
 			}
 
 			final StateObjectCollection<InputChannelStateHandle> inputChannelStateHandles =
 				v == MetadataV3Serializer.VERSION ?
-					MetadataV3Serializer.INSTANCE.deserializeInputChannelStateHandle(dis) :
+					MetadataV3Serializer.deserializeInputChannelStateHandle(dis) :
 					StateObjectCollection.empty();
 
 			final StateObjectCollection<ResultSubpartitionStateHandle> resultSubpartitionStateHandles =