You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/07/04 07:10:32 UTC

[flink] 01/11: [FLINK-12963] [state-processor] Add savepoint writer for bootstrapping new savepoints

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b05d389f063b67edd253836fb71bb50f9f4ade06
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Mon Jun 24 10:47:34 2019 -0500

    [FLINK-12963] [state-processor] Add savepoint writer for bootstrapping new savepoints
---
 .../flink/state/api/BootstrapTransformation.java   | 177 +++++++++
 .../apache/flink/state/api/ExistingSavepoint.java  |  14 +-
 .../state/api/KeyedOperatorTransformation.java     |  86 ++++
 .../api/{Savepoint.java => NewSavepoint.java}      |  25 +-
 .../state/api/OneInputOperatorTransformation.java  | 176 ++++++++
 ...ntMetadata.java => OperatorTransformation.java} |  39 +-
 .../java/org/apache/flink/state/api/Savepoint.java |  40 +-
 ...ta.java => SavepointWriterOperatorFactory.java} |  36 +-
 .../apache/flink/state/api/WritableSavepoint.java  | 129 ++++++
 .../functions/BroadcastStateBootstrapFunction.java |  66 +++
 .../api/functions/KeyedStateBootstrapFunction.java |  71 ++++
 .../api/functions/StateBootstrapFunction.java      |  57 +++
 .../output/BoundedOneInputStreamTaskRunner.java    |  76 ++++
 .../flink/state/api/output/BoundedStreamTask.java  | 124 ++++++
 .../state/api/output/MergeOperatorStates.java      |  57 +++
 .../api/output/OperatorSubtaskStateReducer.java    |  73 ++++
 .../state/api/output/SavepointOutputFormat.java    | 100 +++++
 .../flink/state/api/output/SnapshotUtils.java      |  70 ++++
 .../api/output/TaggedOperatorSubtaskState.java     |  69 ++++
 .../operators/BroadcastStateBootstrapOperator.java | 105 +++++
 .../operators/KeyedStateBootstrapOperator.java     | 114 ++++++
 .../output/operators/StateBootstrapOperator.java   |  93 +++++
 .../partitioner/HashSelector.java}                 |  49 ++-
 .../partitioner/KeyGroupRangePartitioner.java}     |  41 +-
 ...pointMetadata.java => BoundedStreamConfig.java} |  43 +-
 .../metadata/ModifiableSavepointMetadata.java      |  67 ++++
 .../runtime/metadata/OnDiskSavepointMetadata.java  |  96 -----
 .../api/runtime/metadata/SavepointMetadata.java    |  30 +-
 .../state/api/BootstrapTransformationTest.java     | 170 ++++++++
 .../org/apache/flink/state/api/SavepointTest.java  | 118 ++++++
 .../flink/state/api/SavepointWriterITCase.java     | 442 +++++++++++++++++++++
 .../api/output/SavepointOutputFormatTest.java      |  99 +++++
 .../flink/state/api/output/SnapshotUtilsTest.java  | 155 ++++++++
 33 files changed, 2845 insertions(+), 262 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
new file mode 100644
index 0000000..edba718
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.MapPartitionOperator;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner;
+import org.apache.flink.state.api.output.OperatorSubtaskStateReducer;
+import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
+import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperator;
+import org.apache.flink.state.api.output.partitioner.HashSelector;
+import org.apache.flink.state.api.output.partitioner.KeyGroupRangePartitioner;
+import org.apache.flink.state.api.runtime.BoundedStreamConfig;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.OptionalInt;
+
+/**
+ * Bootstrapped data that can be written into a {@code Savepoint}.
+ * @param <T> The input type of the transformation.
+ */
+@PublicEvolving
+@SuppressWarnings("WeakerAccess")
+public class BootstrapTransformation<T> {
+	private final DataSet<T> dataSet;
+
+	private final SavepointWriterOperatorFactory factory;
+
+	@Nullable
+	private final HashSelector<T> keySelector;
+
+	@Nullable
+	private final TypeInformation<?> keyType;
+
+	private final OptionalInt operatorMaxParallelism;
+
+	BootstrapTransformation(
+		DataSet<T> dataSet,
+		OptionalInt operatorMaxParallelism,
+		SavepointWriterOperatorFactory factory) {
+		this.dataSet = dataSet;
+		this.operatorMaxParallelism = operatorMaxParallelism;
+		this.factory = factory;
+		this.keySelector = null;
+		this.keyType = null;
+	}
+
+	<K> BootstrapTransformation(
+		DataSet<T> dataSet,
+		OptionalInt operatorMaxParallelism,
+		SavepointWriterOperatorFactory factory,
+		@Nonnull KeySelector<T, K> keySelector,
+		@Nonnull TypeInformation<K> keyType) {
+		this.dataSet = dataSet;
+		this.operatorMaxParallelism = operatorMaxParallelism;
+		this.factory = factory;
+		this.keySelector = new HashSelector<>(keySelector);
+		this.keyType = keyType;
+	}
+
+	/**
+	 * @return The max parallelism for this operator.
+	 */
+	int getMaxParallelism(SavepointMetadata metadata) {
+		return operatorMaxParallelism.orElse(metadata.maxParallelism());
+	}
+
+	/**
+	 * @param operatorID The operator id for the stream operator.
+	 * @param stateBackend The state backend for the job.
+	 * @param metadata Metadata about the resulting savepoint.
+	 * @param savepointPath The path where the savepoint will be written.
+	 * @return The operator subtask states for this bootstrap transformation.
+	 */
+	DataSet<OperatorState> writeOperatorState(
+		OperatorID operatorID,
+		StateBackend stateBackend,
+		SavepointMetadata metadata,
+		Path savepointPath) {
+		int localMaxParallelism = getMaxParallelism(metadata);
+
+		return writeOperatorSubtaskStates(operatorID, stateBackend, savepointPath, localMaxParallelism)
+			.reduceGroup(new OperatorSubtaskStateReducer(operatorID, localMaxParallelism))
+			.name("reduce(OperatorSubtaskState)");
+	}
+
+	@VisibleForTesting
+	MapPartitionOperator<T, TaggedOperatorSubtaskState> writeOperatorSubtaskStates(
+		OperatorID operatorID,
+		StateBackend stateBackend,
+		Path savepointPath,
+		int localMaxParallelism) {
+
+		DataSet<T> input = dataSet;
+		if (keySelector != null) {
+			input = dataSet.partitionCustom(new KeyGroupRangePartitioner(localMaxParallelism), keySelector);
+		}
+
+		final StreamConfig config;
+		if (keyType == null) {
+			config = new BoundedStreamConfig();
+		} else {
+			TypeSerializer<?> keySerializer = keyType.createSerializer(dataSet.getExecutionEnvironment().getConfig());
+			config = new BoundedStreamConfig(keySerializer, keySelector);
+		}
+
+		StreamOperator<TaggedOperatorSubtaskState> operator = factory.getOperator(
+			System.currentTimeMillis(),
+			savepointPath);
+
+		operator = dataSet.clean(operator);
+		config.setStreamOperator(operator);
+
+		config.setOperatorName(operatorID.toHexString());
+		config.setOperatorID(operatorID);
+		config.setStateBackend(stateBackend);
+
+		BoundedOneInputStreamTaskRunner<T> operatorRunner = new BoundedOneInputStreamTaskRunner<>(
+			config,
+			localMaxParallelism
+		);
+
+		MapPartitionOperator<T, TaggedOperatorSubtaskState> subtaskStates = input
+			.mapPartition(operatorRunner)
+			.name(operatorID.toHexString());
+
+		if (operator instanceof BroadcastStateBootstrapOperator) {
+			subtaskStates = subtaskStates.setParallelism(1);
+		} else {
+			int currentParallelism = getParallelism(subtaskStates);
+			if (currentParallelism > localMaxParallelism) {
+				subtaskStates.setParallelism(localMaxParallelism);
+			}
+		}
+		return subtaskStates;
+	}
+
+	private static <T> int getParallelism(MapPartitionOperator<T, TaggedOperatorSubtaskState> subtaskStates) {
+		int parallelism = subtaskStates.getParallelism();
+		if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
+			parallelism = subtaskStates.getExecutionEnvironment().getParallelism();
+		}
+
+		return parallelism;
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
index 9bfd997..a169662 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
@@ -37,8 +37,7 @@ import org.apache.flink.state.api.input.BroadcastStateInputFormat;
 import org.apache.flink.state.api.input.KeyedStateInputFormat;
 import org.apache.flink.state.api.input.ListStateInputFormat;
 import org.apache.flink.state.api.input.UnionStateInputFormat;
-import org.apache.flink.state.api.runtime.metadata.OnDiskSavepointMetadata;
-import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
+import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -48,20 +47,21 @@ import java.io.IOException;
  */
 @PublicEvolving
 @SuppressWarnings("WeakerAccess")
-public class ExistingSavepoint {
+public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
 	private final ExecutionEnvironment env;
 
-	private final SavepointMetadata metadata;
+	private final ModifiableSavepointMetadata metadata;
 
 	private final StateBackend stateBackend;
 
-	ExistingSavepoint(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
+	ExistingSavepoint(ExecutionEnvironment env, ModifiableSavepointMetadata metadata, StateBackend stateBackend) throws IOException {
+		super(metadata, stateBackend);
 		Preconditions.checkNotNull(env, "The execution environment must not be null");
-		Preconditions.checkNotNull(path, "The savepoint path must not be null");
+		Preconditions.checkNotNull(metadata, "The savepoint metadata must not be null");
 		Preconditions.checkNotNull(stateBackend, "The state backend must not be null");
 
 		this.env = env;
-		this.metadata = new OnDiskSavepointMetadata(path);
+		this.metadata = metadata;
 		this.stateBackend = stateBackend;
 	}
 
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java
new file mode 100644
index 0000000..49825cf
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
+import org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator;
+
+import java.util.OptionalInt;
+
+/**
+ * A {@link KeyedOperatorTransformation} represents a {@link OneInputOperatorTransformation} on which operator state is
+ * partitioned by key using a provided {@link KeySelector}.
+ *
+ * @param <K> The type of the key in the Keyed OperatorTransformation.
+ * @param <T> The type of the elements in the Keyed OperatorTransformation.
+ */
+@PublicEvolving
+@SuppressWarnings("WeakerAccess")
+public class KeyedOperatorTransformation<K, T> {
+	private final DataSet<T> dataSet;
+
+	private final OptionalInt operatorMaxParallelism;
+
+	private final KeySelector<T, K> keySelector;
+
+	private final TypeInformation<K> keyType;
+
+	KeyedOperatorTransformation(
+		DataSet<T> dataSet,
+		OptionalInt operatorMaxParallelism,
+		KeySelector<T, K> keySelector,
+		TypeInformation<K> keyType) {
+		this.dataSet = dataSet;
+		this.operatorMaxParallelism = operatorMaxParallelism;
+		this.keySelector = keySelector;
+		this.keyType = keyType;
+	}
+
+	/**
+	 * Applies the given {@link KeyedStateBootstrapFunction} on the keyed input.
+	 *
+	 * <p>The function will be called for every element in the input and can be used for writing both
+	 * keyed and operator state into a {@link Savepoint}.
+	 *
+	 * @param processFunction The {@link KeyedStateBootstrapFunction} that is called for each element.
+	 * @return An {@link OperatorTransformation} that can be added to a {@link Savepoint}.
+	 */
+	public BootstrapTransformation<T> transform(KeyedStateBootstrapFunction<K, T> processFunction) {
+		SavepointWriterOperatorFactory factory = (timestamp, path) -> new KeyedStateBootstrapOperator<>(timestamp, path, processFunction);
+		return transform(factory);
+	}
+
+	/**
+	 * Method for passing user defined operators along with the type information that will transform
+	 * the OperatorTransformation.
+	 *
+	 * <p><b>IMPORTANT:</b> Any output from this operator will be discarded.
+	 *
+	 * @param factory A factory returning transformation logic type of the return stream
+	 * @return An {@link BootstrapTransformation} that can be added to a {@link Savepoint}.
+	 */
+	private BootstrapTransformation<T> transform(SavepointWriterOperatorFactory factory) {
+		return new BootstrapTransformation<>(dataSet, operatorMaxParallelism, factory, keySelector, keyType);
+	}
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
similarity index 51%
copy from flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
copy to flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
index beea113..f42e886 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
@@ -19,30 +19,15 @@
 package org.apache.flink.state.api;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.runtime.state.StateBackend;
-
-import java.io.IOException;
+import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
 
 /**
- * A {@link Savepoint} is a collection of operator states that can be used to supply initial state
- * when starting a {@link org.apache.flink.streaming.api.datastream.DataStream} job.
+ * A new savepoint.
  */
 @PublicEvolving
-public final class Savepoint {
-
-	private Savepoint() {}
-
-	/**
-	 * Loads an existing savepoint. Useful if you want to query, modify, or extend
-	 * the state of an existing application.
-	 *
-	 * @param env The execution enviornment used to transform the savepoint.
-	 * @param path The path to an existing savepoint on disk.
-	 * @param stateBackend The state backend of the savepoint used for keyed state.
-	 * @return An existing savepoint that can be queried.
-	 */
-	public static ExistingSavepoint load(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
-		return new ExistingSavepoint(env, path, stateBackend);
+public class NewSavepoint extends WritableSavepoint<NewSavepoint> {
+	NewSavepoint(ModifiableSavepointMetadata metadata, StateBackend stateBackend) {
+		super(metadata, stateBackend);
 	}
 }
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java
new file mode 100644
index 0000000..687eced
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
+import org.apache.flink.state.api.functions.StateBootstrapFunction;
+import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperator;
+import org.apache.flink.state.api.output.operators.StateBootstrapOperator;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+
+import java.util.OptionalInt;
+
+/**
+ * {@code OneInputOperatorTransformation} represents a user defined transformation applied on
+ * an {@link OperatorTransformation} with one input.
+ *
+ * @param <T> The type of the elements in this operator.
+ */
+@PublicEvolving
+@SuppressWarnings("WeakerAccess")
+public class OneInputOperatorTransformation<T> {
+	private final DataSet<T> dataSet;
+
+	private OptionalInt operatorMaxParallelism = OptionalInt.empty();
+
+	OneInputOperatorTransformation(DataSet<T> dataSet) {
+		this.dataSet = dataSet;
+	}
+
+
+	/**
+	 * Sets the maximum parallelism of this operator.
+	 *
+	 * <p>The maximum parallelism specifies the upper bound for dynamic scaling. It also defines the
+	 * number of key groups used for partitioned state.
+	 *
+	 * @param maxParallelism Maximum parallelism
+	 * @return The operator with set maximum parallelism
+	 */
+	@PublicEvolving
+	public OneInputOperatorTransformation<T> setMaxParallelism(int maxParallelism) {
+		this.operatorMaxParallelism = OptionalInt.of(maxParallelism);
+		return this;
+	}
+
+	/**
+	 * Applies the given {@link StateBootstrapFunction} on the non-keyed input.
+	 *
+	 * <p>The function will be called for every element in the input and can be used for writing
+	 * operator state into a {@link Savepoint}.
+	 *
+	 * @param processFunction The {@link StateBootstrapFunction} that is called for each element.
+	 * @return An {@link OperatorTransformation} that can be added to a {@link Savepoint}.
+	 */
+	public BootstrapTransformation<T> transform(StateBootstrapFunction<T> processFunction) {
+		SavepointWriterOperatorFactory factory = (timestamp, path) -> new StateBootstrapOperator<>(timestamp, path, processFunction);
+
+		return transform(factory);
+	}
+
+	/**
+	 * Applies the given {@link BroadcastStateBootstrapFunction} on the non-keyed input.
+	 *
+	 * <p>The function will be called for every element in the input and can be used for writing
+	 * broadcast state into a {@link Savepoint}.
+	 *
+	 * @param processFunction The {@link BroadcastStateBootstrapFunction} that is called for each element.
+	 * @return An {@link BootstrapTransformation} that can be added to a {@link Savepoint}.
+	 */
+	public BootstrapTransformation<T> transform(BroadcastStateBootstrapFunction<T> processFunction) {
+		SavepointWriterOperatorFactory factory = (timestamp, path) -> new BroadcastStateBootstrapOperator<>(timestamp, path, processFunction);
+
+		return transform(factory);
+	}
+
+	/**
+	 * Method for passing user defined operators along with the type information that will transform
+	 * the OperatorTransformation.
+	 *
+	 * <p><b>IMPORTANT:</b> Any output from this operator will be discarded.
+	 *
+	 * @param factory A factory returning transformation logic type of the return stream
+	 * @return An {@link BootstrapTransformation} that can be added to a {@link Savepoint}.
+	 */
+	public BootstrapTransformation<T> transform(SavepointWriterOperatorFactory factory) {
+		return new BootstrapTransformation<>(dataSet, operatorMaxParallelism, factory);
+	}
+
+	/**
+	 * It creates a new {@link KeyedOperatorTransformation} that uses the provided key for partitioning its operator
+	 * states.
+	 *
+	 * @param keySelector The KeySelector to be used for extracting the key for partitioning.
+	 * @return The {@code BootstrapTransformation} with partitioned state.
+	 */
+	public <K> KeyedOperatorTransformation<K, T> keyBy(KeySelector<T, K> keySelector) {
+		TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, dataSet.getType());
+		return new KeyedOperatorTransformation<>(dataSet, operatorMaxParallelism, keySelector, keyType);
+	}
+
+	/**
+	 * It creates a new {@link KeyedOperatorTransformation} that uses the provided key with explicit type
+	 * information for partitioning its operator states.
+	 *
+	 * @param keySelector The KeySelector to be used for extracting the key for partitioning.
+	 * @param keyType The type information describing the key type.
+	 * @return The {@code BootstrapTransformation} with partitioned state.
+	 */
+	public <K> KeyedOperatorTransformation<K, T> keyBy(KeySelector<T, K> keySelector, TypeInformation<K> keyType) {
+		return new KeyedOperatorTransformation<>(dataSet, operatorMaxParallelism, keySelector, keyType);
+	}
+
+	/**
+	 * Partitions the operator state of a {@link OperatorTransformation} by the given key positions.
+	 *
+	 * @param fields The position of the fields on which the {@code OperatorTransformation} will be grouped.
+	 * @return The {@code OperatorTransformation} with partitioned state.
+	 */
+	public KeyedOperatorTransformation<Tuple, T> keyBy(int... fields) {
+		if (dataSet.getType() instanceof BasicArrayTypeInfo || dataSet.getType() instanceof PrimitiveArrayTypeInfo) {
+			return keyBy(KeySelectorUtil.getSelectorForArray(fields, dataSet.getType()));
+		} else {
+			return keyBy(new Keys.ExpressionKeys<>(fields, dataSet.getType()));
+		}
+	}
+
+	/**
+	 * Partitions the operator state of a {@link OperatorTransformation} using field expressions. A field expression
+	 * is either the name of a public field or a getter method with parentheses of the {@code
+	 * OperatorTransformation}'s underlying type. A dot can be used to drill down into objects, as in {@code
+	 * "field1.getInnerField2()" }.
+	 *
+	 * @param fields One or more field expressions on which the state of the {@link OperatorTransformation}
+	 *     operators will be partitioned.
+	 * @return The {@code OperatorTransformation} with partitioned state (i.e. KeyedStream)
+	 */
+	public KeyedOperatorTransformation<Tuple, T> keyBy(String... fields) {
+		return keyBy(new Keys.ExpressionKeys<>(fields, dataSet.getType()));
+	}
+
+	private KeyedOperatorTransformation<Tuple, T> keyBy(Keys<T> keys) {
+		KeySelector<T, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
+			keys,
+			dataSet.getType(),
+			dataSet.getExecutionEnvironment().getConfig());
+
+		TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, dataSet.getType());
+		return new KeyedOperatorTransformation<>(dataSet, operatorMaxParallelism, keySelector, keyType);
+	}
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
similarity index 53%
copy from flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
copy to flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
index 3a2b7cb..7bba046 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
@@ -16,36 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.flink.state.api.runtime.metadata;
+package org.apache.flink.state.api;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.DataSet;
 
 /**
- * Returns metadata about a savepoint.
+ * An OperatorTransformation represents a single operator within a {@link Savepoint}.
  */
-@Internal
-public interface SavepointMetadata extends Serializable {
-
-	/**
-	 * @return The max parallelism for the savepoint.
-	 */
-	int maxParallelism();
+@PublicEvolving
+@SuppressWarnings("WeakerAccess")
+public abstract class OperatorTransformation {
 
 	/**
-	 * @return Masters states for the savepoint.
-	 */
-	Collection<MasterState> getMasterStates();
-
-	/**
-	 * @return Operator state for the given UID.
+	 * Create a new {@link OperatorTransformation} from a {@link DataSet}.
 	 *
-	 * @throws IOException If the savepoint does not contain operator state with the given uid.
+	 * @param dataSet A dataset of elements.
+	 * @param <T> The type of the input.
+	 * @return A {@link OneInputOperatorTransformation}.
 	 */
-	OperatorState getOperatorState(String uid) throws IOException;
+	public static <T> OneInputOperatorTransformation<T> bootstrapWith(DataSet<T> dataSet) {
+		return new OneInputOperatorTransformation<>(dataSet);
+	}
 }
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
index beea113..9754c5b 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
@@ -20,9 +20,17 @@ package org.apache.flink.state.api;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.state.api.runtime.SavepointLoader;
+import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+
+import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
 
 /**
  * A {@link Savepoint} is a collection of operator states that can be used to supply initial state
@@ -39,10 +47,36 @@ public final class Savepoint {
 	 *
 	 * @param env The execution enviornment used to transform the savepoint.
 	 * @param path The path to an existing savepoint on disk.
-	 * @param stateBackend The state backend of the savepoint used for keyed state.
-	 * @return An existing savepoint that can be queried.
+	 * @param stateBackend The state backend of the savepoint.
 	 */
 	public static ExistingSavepoint load(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
-		return new ExistingSavepoint(env, path, stateBackend);
+		org.apache.flink.runtime.checkpoint.savepoint.Savepoint savepoint = SavepointLoader.loadSavepoint(path);
+
+		int maxParallelism = savepoint
+			.getOperatorStates()
+			.stream()
+			.map(OperatorState::getMaxParallelism)
+			.max(Comparator.naturalOrder())
+			.orElseThrow(() -> new RuntimeException("Savepoint's must contain at least one operator"));
+
+		ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(maxParallelism, savepoint.getMasterStates(), savepoint.getOperatorStates());
+		return new ExistingSavepoint(env, metadata, stateBackend);
+	}
+
+	/**
+	 * Creates a new savepoint.
+	 *
+	 * @param stateBackend The state backend of the savepoint used for keyed state.
+	 * @param maxParallelism The max parallelism of the savepoint.
+	 * @return A new savepoint.
+	 */
+	public static NewSavepoint create(StateBackend stateBackend, int maxParallelism) {
+		Preconditions.checkArgument(maxParallelism > 0
+				&& maxParallelism <= UPPER_BOUND_MAX_PARALLELISM,
+			"Maximum parallelism must be between 1 and " + UPPER_BOUND_MAX_PARALLELISM
+				+ ". Found: " + maxParallelism);
+
+		ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(maxParallelism, Collections.emptyList(), Collections.emptyList());
+		return new NewSavepoint(metadata, stateBackend);
 	}
 }
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriterOperatorFactory.java
similarity index 50%
copy from flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
copy to flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriterOperatorFactory.java
index 3a2b7cb..a2b3a60 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriterOperatorFactory.java
@@ -16,36 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.state.api.runtime.metadata;
+package org.apache.flink.state.api;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.StreamOperator;
 
 /**
- * Returns metadata about a savepoint.
+ * Creates a savepoint writing operator from a savepoint path.
  */
-@Internal
-public interface SavepointMetadata extends Serializable {
-
-	/**
-	 * @return The max parallelism for the savepoint.
-	 */
-	int maxParallelism();
-
-	/**
-	 * @return Masters states for the savepoint.
-	 */
-	Collection<MasterState> getMasterStates();
-
-	/**
-	 * @return Operator state for the given UID.
-	 *
-	 * @throws IOException If the savepoint does not contain operator state with the given uid.
-	 */
-	OperatorState getOperatorState(String uid) throws IOException;
+@FunctionalInterface
+public interface SavepointWriterOperatorFactory {
+	StreamOperator<TaggedOperatorSubtaskState> getOperator(long timestamp, Path savepointPath);
 }
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
new file mode 100644
index 0000000..300d1ae
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.state.api.output.MergeOperatorStates;
+import org.apache.flink.state.api.output.SavepointOutputFormat;
+import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Any savepoint that can be written to from a batch context.
+ * @param <F> The implementation type.
+ */
+@PublicEvolving
+@SuppressWarnings("WeakerAccess")
+public abstract class WritableSavepoint<F extends WritableSavepoint> {
+
+	protected final ModifiableSavepointMetadata metadata;
+
+	protected final StateBackend stateBackend;
+
+	WritableSavepoint(ModifiableSavepointMetadata metadata, StateBackend stateBackend) {
+		Preconditions.checkNotNull(metadata, "The savepoint metadata must not be null");
+		Preconditions.checkNotNull(stateBackend, "The state backend must not be null");
+		this.metadata = metadata;
+		this.stateBackend = stateBackend;
+	}
+
+	/**
+	 * Drop an existing operator from the savepoint.
+	 * @param uid The uid of the operator.
+	 * @return A modified savepoint.
+	 */
+	@SuppressWarnings("unchecked")
+	public F removeOperator(String uid) {
+		metadata.removeOperator(uid);
+		return (F) this;
+	}
+
+	/**
+	 * Adds a new operator to the savepoint.
+	 * @param uid The uid of the operator.
+	 * @param transformation The operator to be included.
+	 * @return The modified savepoint.
+	 */
+	@SuppressWarnings("unchecked")
+	public <T> F withOperator(String uid, BootstrapTransformation<T> transformation) {
+		metadata.addOperator(uid, transformation);
+		return (F) this;
+	}
+
+	/**
+	 * Write out a new or updated savepoint.
+	 * @param path The path to where the savepoint should be written.
+	 */
+	public final void write(String path) {
+		final Path savepointPath = new Path(path);
+
+		DataSet<OperatorState> newOperatorStates = getOperatorStates(savepointPath);
+
+		List<OperatorState> existingOperators = getExistingOperatorStates();
+
+		DataSet<OperatorState> finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators);
+
+		finalOperatorStates
+			.reduceGroup(new MergeOperatorStates(metadata))
+			.name("reduce(OperatorState)")
+			.output(new SavepointOutputFormat(savepointPath))
+			.name(path);
+	}
+
+	private List<OperatorState> getExistingOperatorStates() {
+		return metadata
+			.getOperatorStates()
+			.entrySet()
+			.stream()
+			.filter(entry -> entry.getValue().isLeft())
+			.map(entry -> entry.getValue().left())
+			.collect(Collectors.toList());
+	}
+
+	private DataSet<OperatorState> getOperatorStates(Path savepointPath) {
+		return metadata
+			.getOperatorStates()
+			.entrySet()
+			.stream()
+			.filter(entry -> entry.getValue().isRight())
+			.map(entry -> entry.getValue().right().writeOperatorState(entry.getKey(), stateBackend, metadata, savepointPath))
+			.reduce(DataSet::union)
+			.orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator"));
+	}
+
+	private DataSet<OperatorState> unionOperatorStates(DataSet<OperatorState> newOperatorStates, List<OperatorState> existingOperators) {
+		DataSet<OperatorState> finalOperatorStates;
+		if (existingOperators.isEmpty()) {
+			finalOperatorStates = newOperatorStates;
+		} else {
+			DataSet<OperatorState> wrappedCollection = newOperatorStates
+				.getExecutionEnvironment()
+				.fromCollection(existingOperators);
+
+			finalOperatorStates = newOperatorStates.union(wrappedCollection);
+		}
+		return finalOperatorStates;
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/BroadcastStateBootstrapFunction.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/BroadcastStateBootstrapFunction.java
new file mode 100644
index 0000000..5ac0d81
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/BroadcastStateBootstrapFunction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+
+/**
+ * Interface for writing elements to broadcast state.
+ *
+ * @param <IN> The type of the input.
+ */
+@PublicEvolving
+public abstract class BroadcastStateBootstrapFunction<IN> extends AbstractRichFunction  {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Writes the given value to operator state. This function is called for every record.
+	 *
+	 * @param value The input record.
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+	 *     operation to fail and may trigger recovery.
+	 */
+	public abstract void processElement(IN value, Context ctx) throws Exception;
+
+	/**
+	 * Context that {@link StateBootstrapFunction}'s can use for getting additional data about an input
+	 * record.
+	 *
+	 * <p>The context is only valid for the duration of a {@link
+	 * #processElement(Object, Context)} call. Do not store the context and use
+	 * afterwards!
+	 */
+	public interface Context {
+
+		/** Returns the current processing time. */
+		long currentProcessingTime();
+
+		/**
+		 * Fetches the {@link BroadcastState} with the specified name.
+		 *
+		 * @param descriptor the {@link MapStateDescriptor} of the state to be fetched.
+		 * @return The required {@link BroadcastState broadcast state}.
+		 */
+		<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> descriptor);
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateBootstrapFunction.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateBootstrapFunction.java
new file mode 100644
index 0000000..7703a58
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateBootstrapFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimerService;
+
+/**
+ * A function that writes keyed state to a new operator.
+ *
+ * <p>For every element {@link #processElement(Object, Context)} is invoked. This can write data to
+ * state and set timers.
+ *
+ * <p><b>NOTE:</b> A {@code KeyedStateBootstrapFunction} is always a {@link
+ * org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {@link
+ * org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown
+ * methods can be implemented. See {@link
+ * org.apache.flink.api.common.functions.RichFunction#open(Configuration)})} and {@link
+ * org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param <K> Type of the keys.
+ * @param <IN> Type of the input.
+ */
+@PublicEvolving
+public abstract class KeyedStateBootstrapFunction<K, IN> extends AbstractRichFunction {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Process one element from the input stream.
+	 *
+	 * <p>This function can update internal state or set timers using the {@link Context} parameter.
+	 *
+	 * @param value The input value.
+	 * @param ctx A {@link Context} that allows querying the timestamp of the element and getting a
+	 *     {@link TimerService} for registering timers and querying the time. The context is only
+	 *     valid during the invocation of this method, do not store it.
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+	 *     operation to fail and may trigger recovery.
+	 */
+	public abstract void processElement(IN value, Context ctx) throws Exception;
+
+	/** Information available in an invocation of {@link #processElement(Object, Context)}. */
+	public abstract class Context {
+
+		/** A {@link TimerService} for querying time and registering timers. */
+		public abstract TimerService timerService();
+
+		/** Get key of the element being processed. */
+		public abstract K getCurrentKey();
+	}
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StateBootstrapFunction.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StateBootstrapFunction.java
new file mode 100644
index 0000000..a91de22
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StateBootstrapFunction.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+/**
+ * Interface for writing elements to operator state.
+ *
+ * @param <IN> The type of the input.
+ */
+@PublicEvolving
+public abstract class StateBootstrapFunction<IN> extends AbstractRichFunction implements CheckpointedFunction {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Writes the given value to operator state. This function is called for every record.
+	 *
+	 * @param value The input record.
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+	 *     operation to fail and may trigger recovery.
+	 */
+	public abstract void processElement(IN value, Context ctx) throws Exception;
+
+	/**
+	 * Context that {@link StateBootstrapFunction}'s can use for getting additional data about an input
+	 * record.
+	 *
+	 * <p>The context is only valid for the duration of a {@link
+	 * StateBootstrapFunction#processElement(Object, Context)} call. Do not store the context and use
+	 * afterwards!
+	 */
+	public interface Context {
+
+		/** Returns the current processing time. */
+		long currentProcessingTime();
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedOneInputStreamTaskRunner.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedOneInputStreamTaskRunner.java
new file mode 100644
index 0000000..f6d5093
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedOneInputStreamTaskRunner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.state.api.runtime.SavepointEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.util.Collector;
+
+/**
+ * A {@link RichMapPartitionFunction} that serves as the runtime for a {@link
+ * BoundedStreamTask}.
+ *
+ * <p>The task is executed processing the data in a particular partition instead of the pulling from
+ * the network stack. After all data has been processed the runner will output the {@link
+ * OperatorSubtaskState} from the snapshot of the bounded task.
+ *
+ * @param <IN> Type of the input to the partition
+ */
+@Internal
+public class BoundedOneInputStreamTaskRunner<IN> extends RichMapPartitionFunction<IN, TaggedOperatorSubtaskState> {
+	private final StreamConfig streamConfig;
+
+	private final int maxParallelism;
+
+	private transient SavepointEnvironment env;
+
+	/**
+	 * Create a new {@link BoundedOneInputStreamTaskRunner}.
+	 *
+	 * @param streamConfig The internal configuration for the task.
+	 * @param  maxParallelism The max parallelism of the operator.
+	 */
+	public BoundedOneInputStreamTaskRunner(
+		StreamConfig streamConfig,
+		int maxParallelism) {
+
+		this.streamConfig = streamConfig;
+		this.maxParallelism = maxParallelism;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+
+		env = new SavepointEnvironment
+			.Builder(getRuntimeContext(), maxParallelism)
+			.setConfiguration(streamConfig.getConfiguration())
+			.build();
+	}
+
+	@Override
+	public void mapPartition(Iterable<IN> values, Collector<TaggedOperatorSubtaskState> out) throws Exception {
+		new BoundedStreamTask<>(env, values, out).invoke();
+	}
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
new file mode 100644
index 0000000..db663da
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.state.api.runtime.NeverFireProcessingTimeService;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Iterator;
+
+/**
+ * A stream task that pulls elements from an {@link Iterable} instead of the network. After all
+ * elements are processed the task takes a snapshot of the subtask operator state. This is a shim
+ * until stream tasks support bounded inputs.
+ *
+ * @param <IN> Type of the input.
+ * @param <OUT> Type of the output.
+ * @param <OP> Type of the operator this task runs.
+ */
+class BoundedStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & BoundedOneInput>
+	extends StreamTask<OUT, OP> {
+
+	private final Iterator<IN> input;
+
+	private final Collector<OUT> collector;
+
+	private final StreamRecord<IN> reuse;
+
+	BoundedStreamTask(
+		Environment environment,
+		Iterable<IN> input,
+		Collector<OUT> collector) {
+		super(environment, new NeverFireProcessingTimeService());
+		this.input = input.iterator();
+		this.collector = collector;
+		this.reuse = new StreamRecord<>(null);
+	}
+
+	@Override
+	protected void init() throws Exception {
+		Preconditions.checkState(
+			operatorChain.getAllOperators().length == 1,
+			"BoundedStreamTask's should only run a single operator");
+
+		// re-initialize the operator with the correct collector.
+		StreamOperatorFactory<OUT> operatorFactory = configuration.getStreamOperatorFactory(getUserCodeClassLoader());
+		headOperator = operatorFactory.createStreamOperator(this, configuration, new CollectorWrapper<>(collector));
+		headOperator.initializeState();
+		headOperator.open();
+	}
+
+	@Override
+	protected void performDefaultAction(ActionContext context) throws Exception {
+		if (input.hasNext()) {
+			reuse.replace(input.next());
+			headOperator.setKeyContextElement1(reuse);
+			headOperator.processElement(reuse);
+		} else {
+			headOperator.endInput();
+			context.allActionsCompleted();
+		}
+	}
+
+	@Override
+	protected void cancelTask() {}
+
+	@Override
+	protected void cleanup() throws Exception {
+		headOperator.close();
+		headOperator.dispose();
+	}
+
+	private static class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
+
+		private final Collector<OUT> inner;
+
+		private CollectorWrapper(Collector<OUT> inner) {
+			this.inner = inner;
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) { }
+
+		@Override
+		public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { }
+
+		@Override
+		public void emitLatencyMarker(LatencyMarker latencyMarker) { }
+
+		@Override
+		public void collect(StreamRecord<OUT> record) {
+			inner.collect(record.getValue());
+		}
+
+		@Override
+		public void close() { }
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
new file mode 100644
index 0000000..f0a7793
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * A reducer that aggregates multiple {@link OperatorState}'s into a single {@link Savepoint}.
+ */
+@Internal
+public class MergeOperatorStates implements GroupReduceFunction<OperatorState, Savepoint> {
+	private final SavepointMetadata metadata;
+
+	public MergeOperatorStates(SavepointMetadata metadata) {
+		Preconditions.checkNotNull(metadata, "Savepoint metadata must not be null");
+
+		this.metadata = metadata;
+	}
+
+	@Override
+	public void reduce(Iterable<OperatorState> values, Collector<Savepoint> out) {
+		Savepoint savepoint =
+			new SavepointV2(
+				SnapshotUtils.CHECKPOINT_ID,
+				StreamSupport.stream(values.spliterator(), false).collect(Collectors.toList()),
+				metadata.getMasterStates());
+
+		out.collect(savepoint);
+	}
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java
new file mode 100644
index 0000000..bfd06b3
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * A reducer that aggregates all {@link OperatorSubtaskState}'s for a particular operator into a
+ * single {@link OperatorState}.
+ */
+@Internal
+public class OperatorSubtaskStateReducer
+	extends RichGroupReduceFunction<TaggedOperatorSubtaskState, OperatorState> {
+
+	private final OperatorID operatorID;
+
+	private final int maxParallelism;
+
+	public OperatorSubtaskStateReducer(OperatorID operatorID, int maxParallelism) {
+		Preconditions.checkNotNull(operatorID, "Operator id must not be null.");
+		Preconditions.checkState(maxParallelism > 1);
+
+		this.operatorID = operatorID;
+		this.maxParallelism = maxParallelism;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+	}
+
+	@Override
+	public void reduce(Iterable<TaggedOperatorSubtaskState> values, Collector<OperatorState> out) {
+		List<TaggedOperatorSubtaskState> subtasks = StreamSupport
+			.stream(values.spliterator(), false)
+			.collect(Collectors.toList());
+
+		OperatorState operatorState = new OperatorState(operatorID, subtasks.size(), maxParallelism);
+
+		for (TaggedOperatorSubtaskState value : subtasks) {
+			operatorState.putState(value.index, value.state);
+		}
+
+		out.collect(operatorState);
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
new file mode 100644
index 0000000..190de52
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.Checkpoints;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
+import org.apache.flink.util.LambdaUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * An output format to serialize {@link Savepoint} metadata to distributed storage.
+ *
+ * <p>This format may only be executed with parallelism 1.
+ */
+@Internal
+public class SavepointOutputFormat extends RichOutputFormat<Savepoint> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(SavepointOutputFormat.class);
+
+	private final Path savepointPath;
+
+	private transient CheckpointStorageLocation targetLocation;
+
+	public SavepointOutputFormat(Path savepointPath) {
+		this.savepointPath = savepointPath;
+	}
+
+	@Override
+	public void configure(Configuration parameters) {}
+
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		Preconditions.checkState(numTasks == 1, "SavepointOutputFormat should only be executed with parallelism 1");
+
+		targetLocation = createSavepointLocation(savepointPath);
+	}
+
+	@Override
+	public void writeRecord(Savepoint savepoint) throws IOException {
+		String path = LambdaUtil.withContextClassLoader(getRuntimeContext().getUserCodeClassLoader(), () -> {
+				try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
+					Checkpoints.storeCheckpointMetadata(savepoint, out);
+					CompletedCheckpointStorageLocation finalizedLocation = out.closeAndFinalizeCheckpoint();
+					return finalizedLocation.getExternalPointer();
+				}
+		});
+
+		LOG.info("Savepoint written to " + path);
+	}
+
+	@Override
+	public void close() {}
+
+	private static CheckpointStorageLocation createSavepointLocation(Path location) throws IOException {
+		final CheckpointStorageLocationReference reference = AbstractFsCheckpointStorage.encodePathAsReference(location);
+		return new FsCheckpointStorageLocation(
+			location.getFileSystem(),
+			location,
+			location,
+			location,
+			reference,
+			CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue(),
+			CheckpointingOptions.FS_WRITE_BUFFER_SIZE.defaultValue());
+	}
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
new file mode 100644
index 0000000..69f1605
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+/**
+ * Takes a final snapshot of the state of an operator subtask.
+ */
+@Internal
+public final class SnapshotUtils {
+	static final long CHECKPOINT_ID = 0L;
+
+	private SnapshotUtils() {}
+
+	public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState snapshot(
+		OP operator,
+		int index,
+		long timestamp,
+		CheckpointStorageWorkerView checkpointStorage,
+		Path savepointPath) throws Exception {
+
+		CheckpointOptions options = new CheckpointOptions(
+			CheckpointType.SAVEPOINT,
+			AbstractFsCheckpointStorage.encodePathAsReference(savepointPath));
+
+		operator.prepareSnapshotPreBarrier(CHECKPOINT_ID);
+
+		CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
+			CHECKPOINT_ID,
+			options.getTargetLocation());
+
+		OperatorSnapshotFutures snapshotInProgress = operator.snapshotState(
+			CHECKPOINT_ID,
+			timestamp,
+			options,
+			storage);
+
+		OperatorSubtaskState state = new OperatorSnapshotFinalizer(snapshotInProgress).getJobManagerOwnedState();
+
+		operator.notifyCheckpointComplete(CHECKPOINT_ID);
+		return new TaggedOperatorSubtaskState(index, state);
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/TaggedOperatorSubtaskState.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/TaggedOperatorSubtaskState.java
new file mode 100644
index 0000000..bf9aacc
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/TaggedOperatorSubtaskState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+
+import java.util.Objects;
+
+/**
+ * A simple wrapper pojo that tags {@link OperatorSubtaskState} with metadata.
+ */
+@Internal
+@SuppressWarnings("WeakerAccess")
+public final class TaggedOperatorSubtaskState {
+
+	public int index;
+
+	public OperatorSubtaskState state;
+
+	public TaggedOperatorSubtaskState(int index, OperatorSubtaskState state) {
+		this.index = index;
+		this.state = state;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		} else if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		TaggedOperatorSubtaskState that = (TaggedOperatorSubtaskState) o;
+		return index == that.index &&
+			Objects.equals(state, that.state);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(index, state);
+	}
+
+	@Override
+	public String toString() {
+		return "TaggedOperatorSubtaskState{" +
+			"index=" + index +
+			", state=" + state +
+			'}';
+	}
+}
+
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
new file mode 100644
index 0000000..9c6c866
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
+import org.apache.flink.state.api.output.SnapshotUtils;
+import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.FlinkRuntimeException;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link
+ * BroadcastStateBootstrapFunction}'s.
+ */
+@Internal
+public class BroadcastStateBootstrapOperator<IN>
+	extends AbstractUdfStreamOperator<TaggedOperatorSubtaskState, BroadcastStateBootstrapFunction<IN>>
+	implements OneInputStreamOperator<IN, TaggedOperatorSubtaskState>,
+	BoundedOneInput {
+
+	private static final long serialVersionUID = 1L;
+
+	private final long timestamp;
+
+	private final Path savepointPath;
+
+	private transient ContextImpl context;
+
+	public BroadcastStateBootstrapOperator(long timestamp, Path savepointPath, BroadcastStateBootstrapFunction<IN> function) {
+		super(function);
+		this.timestamp = timestamp;
+
+		this.savepointPath = savepointPath;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		context = new ContextImpl(getProcessingTimeService());
+	}
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		userFunction.processElement(element.getValue(), context);
+	}
+
+	@Override
+	public void endInput() throws Exception {
+		TaggedOperatorSubtaskState state = SnapshotUtils.snapshot(
+			this,
+			getRuntimeContext().getIndexOfThisSubtask(),
+			timestamp,
+			getContainingTask().getCheckpointStorage(),
+			savepointPath);
+
+		output.collect(new StreamRecord<>(state));
+	}
+
+	private class ContextImpl implements BroadcastStateBootstrapFunction.Context {
+		private final ProcessingTimeService processingTimeService;
+
+		ContextImpl(ProcessingTimeService processingTimeService) {
+			this.processingTimeService = processingTimeService;
+		}
+
+		@Override
+		public long currentProcessingTime() {
+			return processingTimeService.getCurrentProcessingTime();
+		}
+
+		@Override
+		public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> descriptor) {
+			try {
+				return getOperatorStateBackend().getBroadcastState(descriptor);
+			} catch (Exception e) {
+				throw new FlinkRuntimeException(e);
+			}
+		}
+	}
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
new file mode 100644
index 0000000..53cedac
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
+import org.apache.flink.state.api.output.SnapshotUtils;
+import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
+import org.apache.flink.state.api.runtime.VoidTriggerable;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link
+ * KeyedStateBootstrapFunction}'s.
+ */
+@Internal
+public class KeyedStateBootstrapOperator<K, IN>
+	extends AbstractUdfStreamOperator<TaggedOperatorSubtaskState, KeyedStateBootstrapFunction<K, IN>>
+	implements OneInputStreamOperator<IN, TaggedOperatorSubtaskState>,
+	BoundedOneInput {
+
+	private static final long serialVersionUID = 1L;
+
+	private final long timestamp;
+
+	private final Path savepointPath;
+
+	private transient KeyedStateBootstrapOperator<K, IN>.ContextImpl context;
+
+	public KeyedStateBootstrapOperator(long timestamp, Path savepointPath, KeyedStateBootstrapFunction<K, IN> function) {
+		super(function);
+
+		this.timestamp = timestamp;
+		this.savepointPath = savepointPath;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+
+		InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService(
+			"user-timers",
+			VoidNamespaceSerializer.INSTANCE,
+			VoidTriggerable.instance());
+
+		TimerService timerService = new SimpleTimerService(internalTimerService);
+
+		context = new KeyedStateBootstrapOperator<K, IN>.ContextImpl(userFunction, timerService);
+	}
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		userFunction.processElement(element.getValue(), context);
+	}
+
+	@Override
+	public void endInput() throws Exception {
+		TaggedOperatorSubtaskState state = SnapshotUtils.snapshot(
+			this,
+			getRuntimeContext().getIndexOfThisSubtask(),
+			timestamp,
+			getContainingTask().getCheckpointStorage(),
+			savepointPath);
+
+		output.collect(new StreamRecord<>(state));
+	}
+
+	private class ContextImpl extends KeyedStateBootstrapFunction<K, IN>.Context {
+
+		private final TimerService timerService;
+
+		ContextImpl(KeyedStateBootstrapFunction<K, IN> function, TimerService timerService) {
+			function.super();
+			this.timerService = Preconditions.checkNotNull(timerService);
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public K getCurrentKey() {
+			return (K) KeyedStateBootstrapOperator.this.getCurrentKey();
+		}
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
new file mode 100644
index 0000000..ca78f87
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.state.api.functions.StateBootstrapFunction;
+import org.apache.flink.state.api.output.SnapshotUtils;
+import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link
+ * StateBootstrapFunction}'s.
+ */
+@Internal
+public class StateBootstrapOperator<IN>
+	extends AbstractUdfStreamOperator<TaggedOperatorSubtaskState, StateBootstrapFunction<IN>>
+	implements OneInputStreamOperator<IN, TaggedOperatorSubtaskState>,
+	BoundedOneInput {
+
+	private static final long serialVersionUID = 1L;
+
+	private final long timestamp;
+
+	private final Path savepointPath;
+
+	private transient ContextImpl context;
+
+	public StateBootstrapOperator(long timestamp, Path savepointPath, StateBootstrapFunction<IN> function) {
+		super(function);
+
+		this.timestamp = timestamp;
+		this.savepointPath = savepointPath;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		context = new ContextImpl(getProcessingTimeService());
+	}
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		userFunction.processElement(element.getValue(), context);
+	}
+
+	@Override
+	public void endInput() throws Exception {
+		TaggedOperatorSubtaskState state = SnapshotUtils.snapshot(
+			this,
+			getRuntimeContext().getIndexOfThisSubtask(),
+			timestamp,
+			getContainingTask().getCheckpointStorage(),
+			savepointPath);
+
+		output.collect(new StreamRecord<>(state));
+	}
+
+	private class ContextImpl implements StateBootstrapFunction.Context {
+		private final ProcessingTimeService processingTimeService;
+
+		ContextImpl(ProcessingTimeService processingTimeService) {
+			this.processingTimeService = processingTimeService;
+		}
+
+		@Override
+		public long currentProcessingTime() {
+			return processingTimeService.getCurrentProcessingTime();
+		}
+	}
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/partitioner/HashSelector.java
similarity index 53%
copy from flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
copy to flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/partitioner/HashSelector.java
index 3a2b7cb..1d7d4c4 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/partitioner/HashSelector.java
@@ -16,36 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.flink.state.api.runtime.metadata;
+package org.apache.flink.state.api.output.partitioner;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.util.Preconditions;
 
 /**
- * Returns metadata about a savepoint.
+ * A wrapper around a {@link KeySelector} that returns the {@link Object#hashCode()} of the returned
+ * key.
+ *
+ * @param <IN> Type of objects to extract the key from.
  */
 @Internal
-public interface SavepointMetadata extends Serializable {
-
-	/**
-	 * @return The max parallelism for the savepoint.
-	 */
-	int maxParallelism();
-
-	/**
-	 * @return Masters states for the savepoint.
-	 */
-	Collection<MasterState> getMasterStates();
-
-	/**
-	 * @return Operator state for the given UID.
-	 *
-	 * @throws IOException If the savepoint does not contain operator state with the given uid.
-	 */
-	OperatorState getOperatorState(String uid) throws IOException;
+public class HashSelector<IN> implements KeySelector<IN, Integer> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final KeySelector<IN, ?> keySelector;
+
+	public HashSelector(KeySelector<IN, ?> keySelector) {
+		Preconditions.checkNotNull(keySelector);
+		this.keySelector = keySelector;
+	}
+
+	@Override
+	public Integer getKey(IN value) throws Exception {
+		return keySelector.getKey(value).hashCode();
+	}
 }
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/partitioner/KeyGroupRangePartitioner.java
similarity index 53%
copy from flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
copy to flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/partitioner/KeyGroupRangePartitioner.java
index 3a2b7cb..64bffa0 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/partitioner/KeyGroupRangePartitioner.java
@@ -16,36 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.flink.state.api.runtime.metadata;
+package org.apache.flink.state.api.output.partitioner;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 
 /**
- * Returns metadata about a savepoint.
+ * A partitioner that selects the target channel based on the key group index.
  */
 @Internal
-public interface SavepointMetadata extends Serializable {
+public class KeyGroupRangePartitioner implements Partitioner<Integer> {
+
+	private static final long serialVersionUID = 1L;
 
-	/**
-	 * @return The max parallelism for the savepoint.
-	 */
-	int maxParallelism();
+	private final int maxParallelism;
 
-	/**
-	 * @return Masters states for the savepoint.
-	 */
-	Collection<MasterState> getMasterStates();
+	public KeyGroupRangePartitioner(int maxParallelism) {
+		this.maxParallelism = maxParallelism;
+	}
 
-	/**
-	 * @return Operator state for the given UID.
-	 *
-	 * @throws IOException If the savepoint does not contain operator state with the given uid.
-	 */
-	OperatorState getOperatorState(String uid) throws IOException;
+	@Override
+	public int partition(Integer key, int numPartitions) {
+		return KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+			maxParallelism,
+			numPartitions,
+			KeyGroupRangeAssignment.computeKeyGroupForKeyHash(key, maxParallelism));
+	}
 }
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/BoundedStreamConfig.java
similarity index 52%
copy from flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
copy to flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/BoundedStreamConfig.java
index 3a2b7cb..6306447 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/BoundedStreamConfig.java
@@ -16,36 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.flink.state.api.runtime.metadata;
+package org.apache.flink.state.api.runtime;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 
 /**
- * Returns metadata about a savepoint.
+ * A {@link StreamConfig} with default settings.
  */
 @Internal
-public interface SavepointMetadata extends Serializable {
+public class BoundedStreamConfig extends StreamConfig {
+	public BoundedStreamConfig() {
+		super(new Configuration());
 
-	/**
-	 * @return The max parallelism for the savepoint.
-	 */
-	int maxParallelism();
+		setChainStart();
+		setCheckpointingEnabled(true);
+		setCheckpointMode(CheckpointingMode.EXACTLY_ONCE);
+	}
 
-	/**
-	 * @return Masters states for the savepoint.
-	 */
-	Collection<MasterState> getMasterStates();
+	public <IN> BoundedStreamConfig(TypeSerializer<?> keySerializer, KeySelector<IN, ?> keySelector) {
+		this();
 
-	/**
-	 * @return Operator state for the given UID.
-	 *
-	 * @throws IOException If the savepoint does not contain operator state with the given uid.
-	 */
-	OperatorState getOperatorState(String uid) throws IOException;
+		setStateKeySerializer(keySerializer);
+		setStatePartitioner(0, keySelector);
+	}
 }
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
new file mode 100644
index 0000000..158fa35
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
@@ -0,0 +1,67 @@
+package org.apache.flink.state.api.runtime.metadata;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.state.api.BootstrapTransformation;
+import org.apache.flink.state.api.runtime.OperatorIDGenerator;
+import org.apache.flink.types.Either;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Savepoint metadata that can be modified.
+ */
+@Internal
+public class ModifiableSavepointMetadata extends SavepointMetadata {
+
+	private transient Map<OperatorID, Either<OperatorState, BootstrapTransformation<?>>> operatorStateIndex;
+
+	public ModifiableSavepointMetadata(int maxParallelism, Collection<MasterState> masterStates, Collection<OperatorState> initialStates) {
+		super(maxParallelism, masterStates);
+
+		this.operatorStateIndex = new HashMap<>(initialStates.size());
+
+		for (OperatorState operatorState : initialStates) {
+			operatorStateIndex.put(operatorState.getOperatorID(), Either.Left(operatorState));
+		}
+	}
+
+	/**
+	 * @return Operator state for the given UID.
+	 *
+	 * @throws IOException If the savepoint does not contain operator state with the given uid.
+	 */
+	public OperatorState getOperatorState(String uid) throws IOException {
+		OperatorID operatorID = OperatorIDGenerator.fromUid(uid);
+
+		Either<OperatorState, BootstrapTransformation<?>> operatorState = operatorStateIndex.get(operatorID);
+		if (operatorState == null || operatorState.isRight()) {
+			throw new IOException("Savepoint does not contain state with operator uid " + uid);
+		}
+
+		return operatorState.left();
+	}
+
+	public void removeOperator(String uid) {
+		operatorStateIndex.remove(OperatorIDGenerator.fromUid(uid));
+	}
+
+	public void addOperator(String uid, BootstrapTransformation<?> transformation) {
+		OperatorID id = OperatorIDGenerator.fromUid(uid);
+
+		if (operatorStateIndex.containsKey(id)) {
+			throw new IllegalArgumentException("The savepoint already contains uid " + uid + ". All uid's must be unique");
+		}
+
+		operatorStateIndex.put(id, Either.Right(transformation));
+	}
+
+	public Map<OperatorID, Either<OperatorState, BootstrapTransformation<?>>> getOperatorStates() {
+		return operatorStateIndex;
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OnDiskSavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OnDiskSavepointMetadata.java
deleted file mode 100644
index c4010d6..0000000
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OnDiskSavepointMetadata.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.state.api.runtime.metadata;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.state.api.runtime.OperatorIDGenerator;
-import org.apache.flink.state.api.runtime.SavepointLoader;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Returns metadata about an existing savepoint.
- */
-@Internal
-public class OnDiskSavepointMetadata implements SavepointMetadata {
-
-	private static final long serialVersionUID = 3623389893479485802L;
-
-	private final int maxParallelism;
-
-	private final Collection<MasterState> masterStates;
-
-	private final Map<OperatorID, OperatorState> operatorStateIndex;
-
-	public OnDiskSavepointMetadata(String path) throws IOException {
-		Savepoint savepoint = SavepointLoader.loadSavepoint(path);
-
-		this.maxParallelism = savepoint
-			.getOperatorStates()
-			.stream()
-			.map(OperatorState::getMaxParallelism)
-			.max(Comparator.naturalOrder())
-			.orElseThrow(() -> new RuntimeException("Savepoint's must contain at least one operator"));
-
-		this.masterStates = savepoint.getMasterStates();
-
-		Collection<OperatorState> operatorStates = savepoint.getOperatorStates();
-		this.operatorStateIndex = new HashMap<>(operatorStates.size());
-		operatorStates.forEach(state -> this.operatorStateIndex.put(state.getOperatorID(), state));
-	}
-
-	/**
-	 * @return The max parallelism for the savepoint.
-	 */
-	@Override
-	public int maxParallelism() {
-		return maxParallelism;
-	}
-
-	/**
-	 * @return Masters states for the savepoint.
-	 */
-	@Override
-	public Collection<MasterState> getMasterStates() {
-		return masterStates;
-	}
-
-	/**
-	 * @return Operator state for the given UID.
-	 */
-	@Override
-	public OperatorState getOperatorState(String uid) throws IOException {
-		OperatorID operatorID = OperatorIDGenerator.fromUid(uid);
-
-		OperatorState operatorState = operatorStateIndex.get(operatorID);
-		if (operatorState == null) {
-			throw new IOException("Savepoint does not contain state with operator uid " + uid);
-		}
-
-		return operatorState;
-	}
-}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
index 3a2b7cb..40ce66a 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
@@ -20,9 +20,7 @@ package org.apache.flink.state.api.runtime.metadata;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
 
@@ -30,22 +28,30 @@ import java.util.Collection;
  * Returns metadata about a savepoint.
  */
 @Internal
-public interface SavepointMetadata extends Serializable {
+public class SavepointMetadata implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final int maxParallelism;
+
+	private final Collection<MasterState> masterStates;
+
+	public SavepointMetadata(int maxParallelism, Collection<MasterState> masterStates) {
+		this.maxParallelism = maxParallelism;
+		this.masterStates = masterStates;
+	}
 
 	/**
 	 * @return The max parallelism for the savepoint.
 	 */
-	int maxParallelism();
+	public int maxParallelism() {
+		return maxParallelism;
+	}
 
 	/**
 	 * @return Masters states for the savepoint.
 	 */
-	Collection<MasterState> getMasterStates();
-
-	/**
-	 * @return Operator state for the given UID.
-	 *
-	 * @throws IOException If the savepoint does not contain operator state with the given uid.
-	 */
-	OperatorState getOperatorState(String uid) throws IOException;
+	public Collection<MasterState> getMasterStates() {
+		return masterStates;
+	}
 }
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
new file mode 100644
index 0000000..707ccde
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.Operator;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
+import org.apache.flink.state.api.functions.StateBootstrapFunction;
+import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
+import org.apache.flink.state.api.runtime.OperatorIDGenerator;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/**
+ * Tests for bootstrap transformations.
+ */
+public class BootstrapTransformationTest extends AbstractTestBase {
+
+	@Test
+	public void testBroadcastStateTransformationParallelism() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(10);
+
+		DataSet<Integer> input = env.fromElements(0);
+
+		BootstrapTransformation<Integer> transformation = OperatorTransformation
+			.bootstrapWith(input)
+			.transform(new ExampleBroadcastStateBootstrapFunction());
+
+		int maxParallelism = transformation.getMaxParallelism(new SavepointMetadata(4, Collections.emptyList()));
+		DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
+			OperatorIDGenerator.fromUid("uid"),
+			new MemoryStateBackend(),
+			new Path(),
+			maxParallelism
+		);
+
+		Assert.assertEquals("Broadcast transformations should always be run at parallelism 1",
+			1,
+			getParallelism(result));
+	}
+
+	@Test
+	public void testDefaultParallelismRespectedWhenLessThanMaxParallelism() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+
+		DataSource<Integer> input = env.fromElements(0);
+
+		BootstrapTransformation<Integer> transformation = OperatorTransformation
+			.bootstrapWith(input)
+			.transform(new ExampleStateBootstrapFunction());
+
+		int maxParallelism = transformation.getMaxParallelism(new SavepointMetadata(10, Collections.emptyList()));
+		DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
+			OperatorIDGenerator.fromUid("uid"),
+			new MemoryStateBackend(),
+			new Path(),
+			maxParallelism
+		);
+
+		Assert.assertEquals(
+			"The parallelism of a data set should not change when less than the max parallelism of the savepoint",
+			ExecutionConfig.PARALLELISM_DEFAULT,
+			getParallelism(result));
+	}
+
+	@Test
+	public void testMaxParallelismRespected() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(10);
+
+		DataSource<Integer> input = env.fromElements(0);
+
+		BootstrapTransformation<Integer> transformation = OperatorTransformation
+			.bootstrapWith(input)
+			.transform(new ExampleStateBootstrapFunction());
+
+		int maxParallelism = transformation.getMaxParallelism(new SavepointMetadata(4, Collections.emptyList()));
+		DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
+			OperatorIDGenerator.fromUid("uid"),
+			new MemoryStateBackend(),
+			new Path(),
+			maxParallelism
+		);
+
+		Assert.assertEquals(
+			"The parallelism of a data set should be constrained my the savepoint max parallelism",
+			4,
+			getParallelism(result));
+	}
+
+	@Test
+	public void testOperatorSpecificMaxParallelismRespected() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+
+		DataSource<Integer> input = env.fromElements(0);
+
+		BootstrapTransformation<Integer> transformation = OperatorTransformation
+			.bootstrapWith(input)
+			.setMaxParallelism(1)
+			.transform(new ExampleStateBootstrapFunction());
+
+		int maxParallelism = transformation.getMaxParallelism(new SavepointMetadata(4, Collections.emptyList()));
+		DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
+			OperatorIDGenerator.fromUid("uid"),
+			new MemoryStateBackend(),
+			new Path(),
+			maxParallelism
+		);
+
+		Assert.assertEquals("The parallelism of a data set should be constrained my the savepoint max parallelism", 1, getParallelism(result));
+	}
+
+	private static <T> int getParallelism(DataSet<T> dataSet) {
+		//All concrete implementations of DataSet are operators so this should always be safe.
+		return ((Operator) dataSet).getParallelism();
+	}
+
+	private static class ExampleBroadcastStateBootstrapFunction extends BroadcastStateBootstrapFunction<Integer> {
+
+		@Override
+		public void processElement(Integer value, Context ctx) throws Exception {
+		}
+	}
+
+	private static class ExampleStateBootstrapFunction extends StateBootstrapFunction<Integer> {
+
+		@Override
+		public void processElement(Integer value, Context ctx) throws Exception {
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+		}
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java
new file mode 100644
index 0000000..7d34244
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.state.api.functions.StateBootstrapFunction;
+import org.apache.flink.state.api.runtime.OperatorIDGenerator;
+import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Tests the api of creating new savepoints.
+ */
+public class SavepointTest {
+
+	private static final String UID = "uid";
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testNewSavepointEnforceUniqueUIDs() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(10);
+
+		DataSource<Integer> input = env.fromElements(0);
+
+		BootstrapTransformation<Integer> transformation = OperatorTransformation
+			.bootstrapWith(input)
+			.transform(new ExampleStateBootstrapFunction());
+
+		ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(1, Collections.emptyList(), Collections.emptyList());
+
+		new NewSavepoint(metadata, new MemoryStateBackend())
+			.withOperator(UID, transformation)
+			.withOperator(UID, transformation);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testExistingSavepointEnforceUniqueUIDs() throws IOException {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(10);
+
+		DataSource<Integer> input = env.fromElements(0);
+
+		BootstrapTransformation<Integer> transformation = OperatorTransformation
+			.bootstrapWith(input)
+			.transform(new ExampleStateBootstrapFunction());
+
+		Collection<OperatorState> operatorStates = Collections.singletonList(new OperatorState(
+			OperatorIDGenerator.fromUid(UID), 1, 4));
+
+		ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(4, Collections.emptyList(), operatorStates);
+
+		new ExistingSavepoint(env, metadata, new MemoryStateBackend())
+			.withOperator(UID, transformation)
+			.withOperator(UID, transformation);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testExistingSavepointEnforceUniqueUIDsWithOldSavepoint() throws IOException {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(10);
+
+		DataSource<Integer> input = env.fromElements(0);
+
+		BootstrapTransformation<Integer> transformation = OperatorTransformation
+			.bootstrapWith(input)
+			.transform(new ExampleStateBootstrapFunction());
+
+		Collection<OperatorState> operatorStates = Collections.singletonList(new OperatorState(
+			OperatorIDGenerator.fromUid(UID), 1, 4));
+
+		ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(4, Collections.emptyList(), operatorStates);
+
+		new ExistingSavepoint(env, metadata, new MemoryStateBackend())
+			.withOperator(UID, transformation)
+			.write("");
+	}
+
+	private static class ExampleStateBootstrapFunction extends StateBootstrapFunction<Integer> {
+
+		@Override
+		public void processElement(Integer value, Context ctx) throws Exception {
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+		}
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
new file mode 100644
index 0000000..5621fac
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
+import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
+import org.apache.flink.state.api.functions.StateBootstrapFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+/**
+ * IT test for writing savepoints.
+ */
+@RunWith(value = Parameterized.class)
+public class SavepointWriterITCase extends AbstractTestBase {
+	private static final String ACCOUNT_UID = "accounts";
+
+	private static final String CURRENCY_UID = "currency";
+
+	private static final String MODIFY_UID = "numbers";
+
+	private static final MapStateDescriptor<String, Double> descriptor = new MapStateDescriptor<>(
+		"currency-rate", Types.STRING, Types.DOUBLE);
+
+	private final StateBackend backend;
+
+	private static final Collection<Account> accounts = Arrays.asList(
+		new Account(1, 100.0),
+		new Account(2, 100.0),
+		new Account(3, 100.0));
+
+	private static final Collection<CurrencyRate> currencyRates = Arrays.asList(
+		new CurrencyRate("USD", 1.0),
+		new CurrencyRate("EUR", 1.3)
+	);
+
+	public SavepointWriterITCase(StateBackend backend) throws Exception {
+		this.backend = backend;
+
+		//reset the cluster so we can change the state backend
+		miniClusterResource.after();
+		miniClusterResource.before();
+	}
+
+	@Parameterized.Parameters(name = "Savepoint Writer: {0}")
+	public static Collection<StateBackend> data() {
+		return Arrays.asList(
+			new MemoryStateBackend(),
+			new RocksDBStateBackend((StateBackend) new MemoryStateBackend()));
+	}
+
+	@Test
+	public void testStateBootstrapAndModification() throws Exception {
+		final String savepointPath = getTempDirPath(new AbstractID().toHexString());
+
+		bootstrapState(savepointPath);
+
+		validateBootstrap(savepointPath);
+
+		final String modifyPath = getTempDirPath(new AbstractID().toHexString());
+
+		modifySavepoint(savepointPath, modifyPath);
+
+		validateModification(modifyPath);
+	}
+
+	private void bootstrapState(String savepointPath) throws Exception {
+		ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Account> accountDataSet = bEnv.fromCollection(accounts);
+
+		BootstrapTransformation<Account> transformation = OperatorTransformation
+			.bootstrapWith(accountDataSet)
+			.keyBy(acc -> acc.id)
+			.transform(new AccountBootstrapper());
+
+		DataSet<CurrencyRate> currencyDataSet = bEnv.fromCollection(currencyRates);
+
+		BootstrapTransformation<CurrencyRate> broadcastTransformation = OperatorTransformation
+			.bootstrapWith(currencyDataSet)
+			.transform(new CurrencyBootstrapFunction());
+
+		Savepoint
+			.create(backend, 128)
+			.withOperator(ACCOUNT_UID, transformation)
+			.withOperator(CURRENCY_UID, broadcastTransformation)
+			.write(savepointPath);
+
+		bEnv.execute("Bootstrap");
+	}
+
+	private void validateBootstrap(String savepointPath) throws ProgramInvocationException {
+		StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		sEnv.setStateBackend(backend);
+
+		CollectSink.accountList.clear();
+
+		sEnv.fromCollection(accounts)
+			.keyBy(acc -> acc.id)
+			.flatMap(new UpdateAndGetAccount())
+			.uid(ACCOUNT_UID)
+			.addSink(new CollectSink());
+
+		sEnv
+			.fromCollection(currencyRates)
+			.connect(sEnv.fromCollection(currencyRates).broadcast(descriptor))
+			.process(new CurrencyValidationFunction())
+			.uid(CURRENCY_UID)
+			.addSink(new DiscardingSink<>());
+
+		JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
+		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false));
+
+		ClusterClient<?> client = miniClusterResource.getClusterClient();
+		client.submitJob(jobGraph, SavepointWriterITCase.class.getClassLoader());
+
+		Assert.assertEquals("Unexpected output", 3, CollectSink.accountList.size());
+	}
+
+	private void modifySavepoint(String savepointPath, String modifyPath) throws Exception {
+		ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> data = bEnv.fromElements(1, 2, 3);
+
+		BootstrapTransformation<Integer> transformation = OperatorTransformation
+			.bootstrapWith(data)
+			.transform(new ModifyProcessFunction());
+
+		Savepoint
+			.load(bEnv, savepointPath, backend)
+			.removeOperator(CURRENCY_UID)
+			.withOperator(MODIFY_UID, transformation)
+			.write(modifyPath);
+
+		bEnv.execute("Modifying");
+	}
+
+	private void validateModification(String savepointPath) throws ProgramInvocationException {
+		StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		sEnv.setStateBackend(backend);
+
+		CollectSink.accountList.clear();
+
+		DataStream<Account> stream = sEnv.fromCollection(accounts)
+			.keyBy(acc -> acc.id)
+			.flatMap(new UpdateAndGetAccount())
+			.uid(ACCOUNT_UID);
+
+		stream.addSink(new CollectSink());
+
+		stream
+			.map(acc -> acc.id)
+			.map(new StatefulOperator())
+			.uid(MODIFY_UID)
+			.addSink(new DiscardingSink<>());
+
+		JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
+		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false));
+
+		ClusterClient<?> client = miniClusterResource.getClusterClient();
+		client.submitJob(jobGraph, SavepointWriterITCase.class.getClassLoader());
+
+		Assert.assertEquals("Unexpected output", 3, CollectSink.accountList.size());
+	}
+
+	/**
+	 * A simple pojo.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static class Account {
+		Account(int id, double amount) {
+			this.id = id;
+			this.amount = amount;
+			this.timestamp = 1000L;
+		}
+
+		public int id;
+
+		public double amount;
+
+		public long timestamp;
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof Account && ((Account) obj).id == id && ((Account) obj).amount == amount;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(id, amount);
+		}
+	}
+
+	/**
+	 * A simple pojo.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static class CurrencyRate {
+		public String currency;
+
+		public Double rate;
+
+		CurrencyRate(String currency, double rate) {
+			this.currency = currency;
+			this.rate = rate;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof CurrencyRate
+				&& ((CurrencyRate) obj).currency.equals(currency)
+				&& ((CurrencyRate) obj).rate.equals(rate);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(currency, rate);
+		}
+	}
+
+	/**
+	 * A savepoint writer function.
+	 */
+	public static class AccountBootstrapper extends KeyedStateBootstrapFunction<Integer, Account> {
+		ValueState<Double> state;
+
+		@Override
+		public void open(Configuration parameters) {
+			ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("total", Types.DOUBLE);
+			state = getRuntimeContext().getState(descriptor);
+		}
+
+		@Override
+		public void processElement(Account value, Context ctx) throws Exception {
+			state.update(value.amount);
+		}
+	}
+
+	/**
+	 * A streaming function bootstrapped off the state.
+	 */
+	public static class UpdateAndGetAccount extends RichFlatMapFunction<Account, Account> {
+		ValueState<Double> state;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("total", Types.DOUBLE);
+			state = getRuntimeContext().getState(descriptor);
+		}
+
+		@Override
+		public void flatMap(Account value, Collector<Account> out) throws Exception {
+			Double current = state.value();
+			if (current != null) {
+				value.amount += current;
+			}
+
+			state.update(value.amount);
+			out.collect(value);
+		}
+	}
+
+	/**
+	 * A bootstrap function.
+	 */
+	public static class ModifyProcessFunction extends StateBootstrapFunction<Integer> {
+		List<Integer> numbers;
+
+		ListState<Integer> state;
+
+		@Override
+		public void open(Configuration parameters) {
+			numbers = new ArrayList<>();
+		}
+
+		@Override
+		public void processElement(Integer value, Context ctx) {
+			numbers.add(value);
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) throws Exception {
+			state.clear();
+			state.addAll(numbers);
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+			state = context.getOperatorStateStore().getUnionListState(
+				new ListStateDescriptor<>("numbers", Types.INT)
+			);
+		}
+	}
+
+	/**
+	 * A streaming function bootstrapped off the state.
+	 */
+	public static class StatefulOperator extends RichMapFunction<Integer, Integer> implements CheckpointedFunction {
+		List<Integer> numbers;
+
+		ListState<Integer> state;
+
+		@Override
+		public void open(Configuration parameters) {
+			numbers = new ArrayList<>();
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) throws Exception {
+			state.clear();
+			state.addAll(numbers);
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+			state = context.getOperatorStateStore().getUnionListState(
+				new ListStateDescriptor<>("numbers", Types.INT)
+			);
+
+			if (context.isRestored()) {
+				Set<Integer> expected = new HashSet<>();
+				expected.add(1);
+				expected.add(2);
+				expected.add(3);
+
+				for (Integer number : state.get()) {
+					Assert.assertTrue("Duplicate state", expected.contains(number));
+					expected.remove(number);
+				}
+
+				Assert.assertTrue("Failed to bootstrap all state elements: " + Arrays.toString(expected.toArray()), expected.isEmpty());
+			}
+		}
+
+		@Override
+		public Integer map(Integer value) {
+			return null;
+		}
+	}
+
+	/**
+	 * A broadcast bootstrap function.
+	 */
+	public static class CurrencyBootstrapFunction extends BroadcastStateBootstrapFunction<CurrencyRate> {
+
+		@Override
+		public void processElement(CurrencyRate value, Context ctx) throws Exception {
+			ctx.getBroadcastState(descriptor).put(value.currency, value.rate);
+		}
+	}
+
+	/**
+	 * Checks the restored broadcast state.
+	 */
+	public static class CurrencyValidationFunction extends BroadcastProcessFunction<CurrencyRate, CurrencyRate, Void> {
+
+		@Override
+		public void processElement(CurrencyRate value, ReadOnlyContext ctx, Collector<Void> out) throws Exception {
+			Assert.assertEquals(
+				"Incorrect currency rate",
+				value.rate,
+				ctx.getBroadcastState(descriptor).get(value.currency),
+				0.0001);
+		}
+
+		@Override
+		public void processBroadcastElement(CurrencyRate value, Context ctx, Collector<Void> out) {
+			//ignore
+		}
+	}
+
+	/**
+	 * A simple collections sink.
+	 */
+	public static class CollectSink implements SinkFunction<Account> {
+		static Set<Integer> accountList = new ConcurrentSkipListSet<>();
+
+		@Override
+		public void invoke(Account value, Context context) {
+			accountList.add(value.id);
+		}
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
new file mode 100644
index 0000000..7cc4b2f
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.state.api.runtime.OperatorIDGenerator;
+import org.apache.flink.state.api.runtime.SavepointLoader;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collections;
+
+/**
+ * Test for writing output savepoint metadata.
+ */
+public class SavepointOutputFormatTest {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Test(expected = IllegalStateException.class)
+	public void testSavepointOutputFormatOnlyWorksWithParallelismOne() throws Exception {
+		Path path = new Path(temporaryFolder.newFolder().getAbsolutePath());
+		SavepointOutputFormat format = createSavepointOutputFormat(path);
+
+		format.open(0, 2);
+	}
+
+	@Test
+	public void testSavepointOutputFormat() throws Exception {
+		Path path = new Path(temporaryFolder.newFolder().getAbsolutePath());
+		SavepointOutputFormat format = createSavepointOutputFormat(path);
+
+		Savepoint savepoint = createSavepoint();
+
+		format.open(0, 1);
+		format.writeRecord(savepoint);
+		format.close();
+
+		Savepoint savepointOnDisk = SavepointLoader.loadSavepoint(path.getPath());
+
+		Assert.assertEquals(
+			"Incorrect checkpoint id",
+			savepoint.getCheckpointId(),
+			savepointOnDisk.getCheckpointId());
+
+		Assert.assertEquals(
+			"Incorrect number of operator states in savepoint",
+			savepoint.getOperatorStates().size(),
+			savepointOnDisk.getOperatorStates().size());
+
+		Assert.assertEquals(
+			"Incorrect operator state in savepoint",
+			savepoint.getOperatorStates().iterator().next(),
+			savepointOnDisk.getOperatorStates().iterator().next());
+	}
+
+	private SavepointV2 createSavepoint() {
+		OperatorState operatorState = new OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 128);
+
+		operatorState.putState(0, new OperatorSubtaskState());
+		return new SavepointV2(0, Collections.singleton(operatorState), Collections.emptyList());
+	}
+
+	private SavepointOutputFormat createSavepointOutputFormat(Path path) throws Exception {
+		RuntimeContext ctx = new MockStreamingRuntimeContext(false, 1, 0);
+
+		SavepointOutputFormat format = new SavepointOutputFormat(path);
+		format.setRuntimeContext(ctx);
+
+		return format;
+	}
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
new file mode 100644
index 0000000..01bcbbc
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests that snapshot utils can properly snapshot an operator.
+ */
+public class SnapshotUtilsTest {
+
+	private static final List<String> EXPECTED_CALL_OPERATOR_SNAPSHOT = Arrays.asList(
+		"prepareSnapshotPreBarrier",
+		"snapshotState",
+		"notifyCheckpointComplete");
+
+	@Rule
+	public TemporaryFolder folder = new TemporaryFolder();
+
+	private static final List<String> ACTUAL_ORDER_TRACKING =
+		Collections.synchronizedList(new ArrayList<>(EXPECTED_CALL_OPERATOR_SNAPSHOT.size()));
+
+	@Test
+	public void testSnapshotUtilsLifecycle() throws Exception {
+		StreamOperator<Void> operator 		= new LifecycleOperator();
+		CheckpointStorageWorkerView storage = new MockStateBackend().createCheckpointStorage(new JobID());
+
+		Path path = new Path(folder.newFolder().getAbsolutePath());
+
+		SnapshotUtils.snapshot(operator, 0, 0L, storage, path);
+
+		Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, ACTUAL_ORDER_TRACKING);
+	}
+
+	private static class LifecycleOperator implements StreamOperator<Void> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void open() throws Exception {
+			ACTUAL_ORDER_TRACKING.add("open");
+		}
+
+		@Override
+		public void close() throws Exception {
+			ACTUAL_ORDER_TRACKING.add("close");
+		}
+
+		@Override
+		public void dispose() throws Exception {
+			ACTUAL_ORDER_TRACKING.add("dispose");
+		}
+
+		@Override
+		public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+			ACTUAL_ORDER_TRACKING.add("prepareSnapshotPreBarrier");
+		}
+
+		@Override
+		public OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation) throws Exception {
+			ACTUAL_ORDER_TRACKING.add("snapshotState");
+			return new OperatorSnapshotFutures();
+		}
+
+		@Override
+		public void initializeState() throws Exception {
+			ACTUAL_ORDER_TRACKING.add("initializeState");
+		}
+
+		@Override
+		public void setKeyContextElement1(StreamRecord<?> record) throws Exception {
+			ACTUAL_ORDER_TRACKING.add("setKeyContextElement1");
+		}
+
+		@Override
+		public void setKeyContextElement2(StreamRecord<?> record) throws Exception {
+			ACTUAL_ORDER_TRACKING.add("setKeyContextElement2");
+		}
+
+		@Override
+		public ChainingStrategy getChainingStrategy() {
+			ACTUAL_ORDER_TRACKING.add("getChainingStrategy");
+			return null;
+		}
+
+		@Override
+		public void setChainingStrategy(ChainingStrategy strategy) {
+			ACTUAL_ORDER_TRACKING.add("setChainingStrategy");
+		}
+
+		@Override
+		public MetricGroup getMetricGroup() {
+			ACTUAL_ORDER_TRACKING.add("getMetricGroup");
+			return null;
+		}
+
+		@Override
+		public OperatorID getOperatorID() {
+			ACTUAL_ORDER_TRACKING.add("getOperatorID");
+			return null;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			ACTUAL_ORDER_TRACKING.add("notifyCheckpointComplete");
+		}
+
+		@Override
+		public void setCurrentKey(Object key) {
+			ACTUAL_ORDER_TRACKING.add("setCurrentKey");
+		}
+
+		@Override
+		public Object getCurrentKey() {
+			ACTUAL_ORDER_TRACKING.add("getCurrentKey");
+			return null;
+		}
+	}
+}