You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/07/04 07:10:32 UTC
[flink] 01/11: [FLINK-12963] [state-processor] Add savepoint writer
for bootstrapping new savepoints
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b05d389f063b67edd253836fb71bb50f9f4ade06
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Mon Jun 24 10:47:34 2019 -0500
[FLINK-12963] [state-processor] Add savepoint writer for bootstrapping new savepoints
---
.../flink/state/api/BootstrapTransformation.java | 177 +++++++++
.../apache/flink/state/api/ExistingSavepoint.java | 14 +-
.../state/api/KeyedOperatorTransformation.java | 86 ++++
.../api/{Savepoint.java => NewSavepoint.java} | 25 +-
.../state/api/OneInputOperatorTransformation.java | 176 ++++++++
...ntMetadata.java => OperatorTransformation.java} | 39 +-
.../java/org/apache/flink/state/api/Savepoint.java | 40 +-
...ta.java => SavepointWriterOperatorFactory.java} | 36 +-
.../apache/flink/state/api/WritableSavepoint.java | 129 ++++++
.../functions/BroadcastStateBootstrapFunction.java | 66 +++
.../api/functions/KeyedStateBootstrapFunction.java | 71 ++++
.../api/functions/StateBootstrapFunction.java | 57 +++
.../output/BoundedOneInputStreamTaskRunner.java | 76 ++++
.../flink/state/api/output/BoundedStreamTask.java | 124 ++++++
.../state/api/output/MergeOperatorStates.java | 57 +++
.../api/output/OperatorSubtaskStateReducer.java | 73 ++++
.../state/api/output/SavepointOutputFormat.java | 100 +++++
.../flink/state/api/output/SnapshotUtils.java | 70 ++++
.../api/output/TaggedOperatorSubtaskState.java | 69 ++++
.../operators/BroadcastStateBootstrapOperator.java | 105 +++++
.../operators/KeyedStateBootstrapOperator.java | 114 ++++++
.../output/operators/StateBootstrapOperator.java | 93 +++++
.../partitioner/HashSelector.java} | 49 ++-
.../partitioner/KeyGroupRangePartitioner.java} | 41 +-
...pointMetadata.java => BoundedStreamConfig.java} | 43 +-
.../metadata/ModifiableSavepointMetadata.java | 67 ++++
.../runtime/metadata/OnDiskSavepointMetadata.java | 96 -----
.../api/runtime/metadata/SavepointMetadata.java | 30 +-
.../state/api/BootstrapTransformationTest.java | 170 ++++++++
.../org/apache/flink/state/api/SavepointTest.java | 118 ++++++
.../flink/state/api/SavepointWriterITCase.java | 442 +++++++++++++++++++++
.../api/output/SavepointOutputFormatTest.java | 99 +++++
.../flink/state/api/output/SnapshotUtilsTest.java | 155 ++++++++
33 files changed, 2845 insertions(+), 262 deletions(-)
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
new file mode 100644
index 0000000..edba718
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.MapPartitionOperator;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner;
+import org.apache.flink.state.api.output.OperatorSubtaskStateReducer;
+import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
+import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperator;
+import org.apache.flink.state.api.output.partitioner.HashSelector;
+import org.apache.flink.state.api.output.partitioner.KeyGroupRangePartitioner;
+import org.apache.flink.state.api.runtime.BoundedStreamConfig;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.OptionalInt;
+
+/**
+ * Bootstrapped data that can be written into a {@code Savepoint}.
+ * @param <T> The input type of the transformation.
+ */
+@PublicEvolving
+@SuppressWarnings("WeakerAccess")
+public class BootstrapTransformation<T> {
+ private final DataSet<T> dataSet;
+
+ private final SavepointWriterOperatorFactory factory;
+
+ @Nullable
+ private final HashSelector<T> keySelector;
+
+ @Nullable
+ private final TypeInformation<?> keyType;
+
+ private final OptionalInt operatorMaxParallelism;
+
+ BootstrapTransformation(
+ DataSet<T> dataSet,
+ OptionalInt operatorMaxParallelism,
+ SavepointWriterOperatorFactory factory) {
+ this.dataSet = dataSet;
+ this.operatorMaxParallelism = operatorMaxParallelism;
+ this.factory = factory;
+ this.keySelector = null;
+ this.keyType = null;
+ }
+
+ <K> BootstrapTransformation(
+ DataSet<T> dataSet,
+ OptionalInt operatorMaxParallelism,
+ SavepointWriterOperatorFactory factory,
+ @Nonnull KeySelector<T, K> keySelector,
+ @Nonnull TypeInformation<K> keyType) {
+ this.dataSet = dataSet;
+ this.operatorMaxParallelism = operatorMaxParallelism;
+ this.factory = factory;
+ this.keySelector = new HashSelector<>(keySelector);
+ this.keyType = keyType;
+ }
+
+ /**
+ * @return The max parallelism for this operator.
+ */
+ int getMaxParallelism(SavepointMetadata metadata) {
+ return operatorMaxParallelism.orElse(metadata.maxParallelism());
+ }
+
+ /**
+ * @param operatorID The operator id for the stream operator.
+ * @param stateBackend The state backend for the job.
+ * @param metadata Metadata about the resulting savepoint.
+ * @param savepointPath The path where the savepoint will be written.
+ * @return The operator subtask states for this bootstrap transformation.
+ */
+ DataSet<OperatorState> writeOperatorState(
+ OperatorID operatorID,
+ StateBackend stateBackend,
+ SavepointMetadata metadata,
+ Path savepointPath) {
+ int localMaxParallelism = getMaxParallelism(metadata);
+
+ return writeOperatorSubtaskStates(operatorID, stateBackend, savepointPath, localMaxParallelism)
+ .reduceGroup(new OperatorSubtaskStateReducer(operatorID, localMaxParallelism))
+ .name("reduce(OperatorSubtaskState)");
+ }
+
+ @VisibleForTesting
+ MapPartitionOperator<T, TaggedOperatorSubtaskState> writeOperatorSubtaskStates(
+ OperatorID operatorID,
+ StateBackend stateBackend,
+ Path savepointPath,
+ int localMaxParallelism) {
+
+ DataSet<T> input = dataSet;
+ if (keySelector != null) {
+ input = dataSet.partitionCustom(new KeyGroupRangePartitioner(localMaxParallelism), keySelector);
+ }
+
+ final StreamConfig config;
+ if (keyType == null) {
+ config = new BoundedStreamConfig();
+ } else {
+ TypeSerializer<?> keySerializer = keyType.createSerializer(dataSet.getExecutionEnvironment().getConfig());
+ config = new BoundedStreamConfig(keySerializer, keySelector);
+ }
+
+ StreamOperator<TaggedOperatorSubtaskState> operator = factory.getOperator(
+ System.currentTimeMillis(),
+ savepointPath);
+
+ operator = dataSet.clean(operator);
+ config.setStreamOperator(operator);
+
+ config.setOperatorName(operatorID.toHexString());
+ config.setOperatorID(operatorID);
+ config.setStateBackend(stateBackend);
+
+ BoundedOneInputStreamTaskRunner<T> operatorRunner = new BoundedOneInputStreamTaskRunner<>(
+ config,
+ localMaxParallelism
+ );
+
+ MapPartitionOperator<T, TaggedOperatorSubtaskState> subtaskStates = input
+ .mapPartition(operatorRunner)
+ .name(operatorID.toHexString());
+
+ if (operator instanceof BroadcastStateBootstrapOperator) {
+ subtaskStates = subtaskStates.setParallelism(1);
+ } else {
+ int currentParallelism = getParallelism(subtaskStates);
+ if (currentParallelism > localMaxParallelism) {
+ subtaskStates.setParallelism(localMaxParallelism);
+ }
+ }
+ return subtaskStates;
+ }
+
+ private static <T> int getParallelism(MapPartitionOperator<T, TaggedOperatorSubtaskState> subtaskStates) {
+ int parallelism = subtaskStates.getParallelism();
+ if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
+ parallelism = subtaskStates.getExecutionEnvironment().getParallelism();
+ }
+
+ return parallelism;
+ }
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
index 9bfd997..a169662 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
@@ -37,8 +37,7 @@ import org.apache.flink.state.api.input.BroadcastStateInputFormat;
import org.apache.flink.state.api.input.KeyedStateInputFormat;
import org.apache.flink.state.api.input.ListStateInputFormat;
import org.apache.flink.state.api.input.UnionStateInputFormat;
-import org.apache.flink.state.api.runtime.metadata.OnDiskSavepointMetadata;
-import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
+import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -48,20 +47,21 @@ import java.io.IOException;
*/
@PublicEvolving
@SuppressWarnings("WeakerAccess")
-public class ExistingSavepoint {
+public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
private final ExecutionEnvironment env;
- private final SavepointMetadata metadata;
+ private final ModifiableSavepointMetadata metadata;
private final StateBackend stateBackend;
- ExistingSavepoint(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
+ ExistingSavepoint(ExecutionEnvironment env, ModifiableSavepointMetadata metadata, StateBackend stateBackend) throws IOException {
+ super(metadata, stateBackend);
Preconditions.checkNotNull(env, "The execution environment must not be null");
- Preconditions.checkNotNull(path, "The savepoint path must not be null");
+ Preconditions.checkNotNull(metadata, "The savepoint metadata must not be null");
Preconditions.checkNotNull(stateBackend, "The state backend must not be null");
this.env = env;
- this.metadata = new OnDiskSavepointMetadata(path);
+ this.metadata = metadata;
this.stateBackend = stateBackend;
}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java
new file mode 100644
index 0000000..49825cf
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
+import org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator;
+
+import java.util.OptionalInt;
+
+/**
+ * A {@link KeyedOperatorTransformation} represents a {@link OneInputOperatorTransformation} on which operator state is
+ * partitioned by key using a provided {@link KeySelector}.
+ *
+ * @param <K> The type of the key in the Keyed OperatorTransformation.
+ * @param <T> The type of the elements in the Keyed OperatorTransformation.
+ */
+@PublicEvolving
+@SuppressWarnings("WeakerAccess")
+public class KeyedOperatorTransformation<K, T> {
+ private final DataSet<T> dataSet;
+
+ private final OptionalInt operatorMaxParallelism;
+
+ private final KeySelector<T, K> keySelector;
+
+ private final TypeInformation<K> keyType;
+
+ KeyedOperatorTransformation(
+ DataSet<T> dataSet,
+ OptionalInt operatorMaxParallelism,
+ KeySelector<T, K> keySelector,
+ TypeInformation<K> keyType) {
+ this.dataSet = dataSet;
+ this.operatorMaxParallelism = operatorMaxParallelism;
+ this.keySelector = keySelector;
+ this.keyType = keyType;
+ }
+
+ /**
+ * Applies the given {@link KeyedStateBootstrapFunction} on the keyed input.
+ *
+ * <p>The function will be called for every element in the input and can be used for writing both
+ * keyed and operator state into a {@link Savepoint}.
+ *
+ * @param processFunction The {@link KeyedStateBootstrapFunction} that is called for each element.
+ * @return An {@link OperatorTransformation} that can be added to a {@link Savepoint}.
+ */
+ public BootstrapTransformation<T> transform(KeyedStateBootstrapFunction<K, T> processFunction) {
+ SavepointWriterOperatorFactory factory = (timestamp, path) -> new KeyedStateBootstrapOperator<>(timestamp, path, processFunction);
+ return transform(factory);
+ }
+
+ /**
+ * Method for passing user defined operators along with the type information that will transform
+ * the OperatorTransformation.
+ *
+ * <p><b>IMPORTANT:</b> Any output from this operator will be discarded.
+ *
+ * @param factory A factory returning transformation logic type of the return stream
+ * @return An {@link BootstrapTransformation} that can be added to a {@link Savepoint}.
+ */
+ private BootstrapTransformation<T> transform(SavepointWriterOperatorFactory factory) {
+ return new BootstrapTransformation<>(dataSet, operatorMaxParallelism, factory, keySelector, keyType);
+ }
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
similarity index 51%
copy from flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
copy to flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
index beea113..f42e886 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
@@ -19,30 +19,15 @@
package org.apache.flink.state.api;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.runtime.state.StateBackend;
-
-import java.io.IOException;
+import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
/**
- * A {@link Savepoint} is a collection of operator states that can be used to supply initial state
- * when starting a {@link org.apache.flink.streaming.api.datastream.DataStream} job.
+ * A new savepoint.
*/
@PublicEvolving
-public final class Savepoint {
-
- private Savepoint() {}
-
- /**
- * Loads an existing savepoint. Useful if you want to query, modify, or extend
- * the state of an existing application.
- *
- * @param env The execution enviornment used to transform the savepoint.
- * @param path The path to an existing savepoint on disk.
- * @param stateBackend The state backend of the savepoint used for keyed state.
- * @return An existing savepoint that can be queried.
- */
- public static ExistingSavepoint load(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
- return new ExistingSavepoint(env, path, stateBackend);
+public class NewSavepoint extends WritableSavepoint<NewSavepoint> {
+ NewSavepoint(ModifiableSavepointMetadata metadata, StateBackend stateBackend) {
+ super(metadata, stateBackend);
}
}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java
new file mode 100644
index 0000000..687eced
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
+import org.apache.flink.state.api.functions.StateBootstrapFunction;
+import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperator;
+import org.apache.flink.state.api.output.operators.StateBootstrapOperator;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+
+import java.util.OptionalInt;
+
+/**
+ * {@code OneInputOperatorTransformation} represents a user defined transformation applied on
+ * an {@link OperatorTransformation} with one input.
+ *
+ * @param <T> The type of the elements in this operator.
+ */
+@PublicEvolving
+@SuppressWarnings("WeakerAccess")
+public class OneInputOperatorTransformation<T> {
+ private final DataSet<T> dataSet;
+
+ private OptionalInt operatorMaxParallelism = OptionalInt.empty();
+
+ OneInputOperatorTransformation(DataSet<T> dataSet) {
+ this.dataSet = dataSet;
+ }
+
+
+ /**
+ * Sets the maximum parallelism of this operator.
+ *
+ * <p>The maximum parallelism specifies the upper bound for dynamic scaling. It also defines the
+ * number of key groups used for partitioned state.
+ *
+ * @param maxParallelism Maximum parallelism
+ * @return The operator with set maximum parallelism
+ */
+ @PublicEvolving
+ public OneInputOperatorTransformation<T> setMaxParallelism(int maxParallelism) {
+ this.operatorMaxParallelism = OptionalInt.of(maxParallelism);
+ return this;
+ }
+
+ /**
+ * Applies the given {@link StateBootstrapFunction} on the non-keyed input.
+ *
+ * <p>The function will be called for every element in the input and can be used for writing
+ * operator state into a {@link Savepoint}.
+ *
+ * @param processFunction The {@link StateBootstrapFunction} that is called for each element.
+ * @return An {@link OperatorTransformation} that can be added to a {@link Savepoint}.
+ */
+ public BootstrapTransformation<T> transform(StateBootstrapFunction<T> processFunction) {
+ SavepointWriterOperatorFactory factory = (timestamp, path) -> new StateBootstrapOperator<>(timestamp, path, processFunction);
+
+ return transform(factory);
+ }
+
+ /**
+ * Applies the given {@link BroadcastStateBootstrapFunction} on the non-keyed input.
+ *
+ * <p>The function will be called for every element in the input and can be used for writing
+ * broadcast state into a {@link Savepoint}.
+ *
+ * @param processFunction The {@link BroadcastStateBootstrapFunction} that is called for each element.
+ * @return An {@link BootstrapTransformation} that can be added to a {@link Savepoint}.
+ */
+ public BootstrapTransformation<T> transform(BroadcastStateBootstrapFunction<T> processFunction) {
+ SavepointWriterOperatorFactory factory = (timestamp, path) -> new BroadcastStateBootstrapOperator<>(timestamp, path, processFunction);
+
+ return transform(factory);
+ }
+
+ /**
+ * Method for passing user defined operators along with the type information that will transform
+ * the OperatorTransformation.
+ *
+ * <p><b>IMPORTANT:</b> Any output from this operator will be discarded.
+ *
+ * @param factory A factory returning transformation logic type of the return stream
+ * @return An {@link BootstrapTransformation} that can be added to a {@link Savepoint}.
+ */
+ public BootstrapTransformation<T> transform(SavepointWriterOperatorFactory factory) {
+ return new BootstrapTransformation<>(dataSet, operatorMaxParallelism, factory);
+ }
+
+ /**
+ * It creates a new {@link KeyedOperatorTransformation} that uses the provided key for partitioning its operator
+ * states.
+ *
+ * @param keySelector The KeySelector to be used for extracting the key for partitioning.
+ * @return The {@code BootstrapTransformation} with partitioned state.
+ */
+ public <K> KeyedOperatorTransformation<K, T> keyBy(KeySelector<T, K> keySelector) {
+ TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, dataSet.getType());
+ return new KeyedOperatorTransformation<>(dataSet, operatorMaxParallelism, keySelector, keyType);
+ }
+
+ /**
+ * It creates a new {@link KeyedOperatorTransformation} that uses the provided key with explicit type
+ * information for partitioning its operator states.
+ *
+ * @param keySelector The KeySelector to be used for extracting the key for partitioning.
+ * @param keyType The type information describing the key type.
+ * @return The {@code BootstrapTransformation} with partitioned state.
+ */
+ public <K> KeyedOperatorTransformation<K, T> keyBy(KeySelector<T, K> keySelector, TypeInformation<K> keyType) {
+ return new KeyedOperatorTransformation<>(dataSet, operatorMaxParallelism, keySelector, keyType);
+ }
+
+ /**
+ * Partitions the operator state of a {@link OperatorTransformation} by the given key positions.
+ *
+ * @param fields The position of the fields on which the {@code OperatorTransformation} will be grouped.
+ * @return The {@code OperatorTransformation} with partitioned state.
+ */
+ public KeyedOperatorTransformation<Tuple, T> keyBy(int... fields) {
+ if (dataSet.getType() instanceof BasicArrayTypeInfo || dataSet.getType() instanceof PrimitiveArrayTypeInfo) {
+ return keyBy(KeySelectorUtil.getSelectorForArray(fields, dataSet.getType()));
+ } else {
+ return keyBy(new Keys.ExpressionKeys<>(fields, dataSet.getType()));
+ }
+ }
+
+ /**
+ * Partitions the operator state of a {@link OperatorTransformation} using field expressions. A field expression
+ * is either the name of a public field or a getter method with parentheses of the {@code
+ * OperatorTransformation}'s underlying type. A dot can be used to drill down into objects, as in {@code
+ * "field1.getInnerField2()" }.
+ *
+ * @param fields One or more field expressions on which the state of the {@link OperatorTransformation}
+ * operators will be partitioned.
+ * @return The {@code OperatorTransformation} with partitioned state (i.e. KeyedStream)
+ */
+ public KeyedOperatorTransformation<Tuple, T> keyBy(String... fields) {
+ return keyBy(new Keys.ExpressionKeys<>(fields, dataSet.getType()));
+ }
+
+ private KeyedOperatorTransformation<Tuple, T> keyBy(Keys<T> keys) {
+ KeySelector<T, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
+ keys,
+ dataSet.getType(),
+ dataSet.getExecutionEnvironment().getConfig());
+
+ TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, dataSet.getType());
+ return new KeyedOperatorTransformation<>(dataSet, operatorMaxParallelism, keySelector, keyType);
+ }
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
similarity index 53%
copy from flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
copy to flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
index 3a2b7cb..7bba046 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
@@ -16,36 +16,27 @@
* limitations under the License.
*/
-package org.apache.flink.state.api.runtime.metadata;
+package org.apache.flink.state.api;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.DataSet;
/**
- * Returns metadata about a savepoint.
+ * An OperatorTransformation represents a single operator within a {@link Savepoint}.
*/
-@Internal
-public interface SavepointMetadata extends Serializable {
-
- /**
- * @return The max parallelism for the savepoint.
- */
- int maxParallelism();
+@PublicEvolving
+@SuppressWarnings("WeakerAccess")
+public abstract class OperatorTransformation {
/**
- * @return Masters states for the savepoint.
- */
- Collection<MasterState> getMasterStates();
-
- /**
- * @return Operator state for the given UID.
+ * Create a new {@link OperatorTransformation} from a {@link DataSet}.
*
- * @throws IOException If the savepoint does not contain operator state with the given uid.
+ * @param dataSet A dataset of elements.
+ * @param <T> The type of the input.
+ * @return A {@link OneInputOperatorTransformation}.
*/
- OperatorState getOperatorState(String uid) throws IOException;
+ public static <T> OneInputOperatorTransformation<T> bootstrapWith(DataSet<T> dataSet) {
+ return new OneInputOperatorTransformation<>(dataSet);
+ }
}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
index beea113..9754c5b 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
@@ -20,9 +20,17 @@ package org.apache.flink.state.api;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.state.api.runtime.SavepointLoader;
+import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+
+import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
/**
* A {@link Savepoint} is a collection of operator states that can be used to supply initial state
@@ -39,10 +47,36 @@ public final class Savepoint {
*
* @param env The execution enviornment used to transform the savepoint.
* @param path The path to an existing savepoint on disk.
- * @param stateBackend The state backend of the savepoint used for keyed state.
- * @return An existing savepoint that can be queried.
+ * @param stateBackend The state backend of the savepoint.
*/
public static ExistingSavepoint load(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
- return new ExistingSavepoint(env, path, stateBackend);
+ org.apache.flink.runtime.checkpoint.savepoint.Savepoint savepoint = SavepointLoader.loadSavepoint(path);
+
+ int maxParallelism = savepoint
+ .getOperatorStates()
+ .stream()
+ .map(OperatorState::getMaxParallelism)
+ .max(Comparator.naturalOrder())
+ .orElseThrow(() -> new RuntimeException("Savepoint's must contain at least one operator"));
+
+ ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(maxParallelism, savepoint.getMasterStates(), savepoint.getOperatorStates());
+ return new ExistingSavepoint(env, metadata, stateBackend);
+ }
+
+ /**
+ * Creates a new savepoint.
+ *
+ * @param stateBackend The state backend of the savepoint used for keyed state.
+ * @param maxParallelism The max parallelism of the savepoint.
+ * @return A new savepoint.
+ */
+ public static NewSavepoint create(StateBackend stateBackend, int maxParallelism) {
+ Preconditions.checkArgument(maxParallelism > 0
+ && maxParallelism <= UPPER_BOUND_MAX_PARALLELISM,
+ "Maximum parallelism must be between 1 and " + UPPER_BOUND_MAX_PARALLELISM
+ + ". Found: " + maxParallelism);
+
+ ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(maxParallelism, Collections.emptyList(), Collections.emptyList());
+ return new NewSavepoint(metadata, stateBackend);
}
}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriterOperatorFactory.java
similarity index 50%
copy from flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
copy to flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriterOperatorFactory.java
index 3a2b7cb..a2b3a60 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriterOperatorFactory.java
@@ -16,36 +16,16 @@
* limitations under the License.
*/
-package org.apache.flink.state.api.runtime.metadata;
+package org.apache.flink.state.api;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.StreamOperator;
/**
- * Returns metadata about a savepoint.
+ * Creates a savepoint writing operator from a savepoint path.
*/
-@Internal
-public interface SavepointMetadata extends Serializable {
-
- /**
- * @return The max parallelism for the savepoint.
- */
- int maxParallelism();
-
- /**
- * @return Masters states for the savepoint.
- */
- Collection<MasterState> getMasterStates();
-
- /**
- * @return Operator state for the given UID.
- *
- * @throws IOException If the savepoint does not contain operator state with the given uid.
- */
- OperatorState getOperatorState(String uid) throws IOException;
+@FunctionalInterface
+public interface SavepointWriterOperatorFactory {
+ StreamOperator<TaggedOperatorSubtaskState> getOperator(long timestamp, Path savepointPath);
}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
new file mode 100644
index 0000000..300d1ae
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.state.api.output.MergeOperatorStates;
+import org.apache.flink.state.api.output.SavepointOutputFormat;
+import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Any savepoint that can be written to from a batch context.
+ * @param <F> The implementation type.
+ */
+@PublicEvolving
+@SuppressWarnings("WeakerAccess")
+public abstract class WritableSavepoint<F extends WritableSavepoint> {
+
+ protected final ModifiableSavepointMetadata metadata;
+
+ protected final StateBackend stateBackend;
+
+ WritableSavepoint(ModifiableSavepointMetadata metadata, StateBackend stateBackend) {
+ Preconditions.checkNotNull(metadata, "The savepoint metadata must not be null");
+ Preconditions.checkNotNull(stateBackend, "The state backend must not be null");
+ this.metadata = metadata;
+ this.stateBackend = stateBackend;
+ }
+
+ /**
+ * Drop an existing operator from the savepoint.
+ * @param uid The uid of the operator.
+ * @return A modified savepoint.
+ */
+ @SuppressWarnings("unchecked")
+ public F removeOperator(String uid) {
+ metadata.removeOperator(uid);
+ return (F) this;
+ }
+
+ /**
+ * Adds a new operator to the savepoint.
+ * @param uid The uid of the operator.
+ * @param transformation The operator to be included.
+ * @return The modified savepoint.
+ */
+ @SuppressWarnings("unchecked")
+ public <T> F withOperator(String uid, BootstrapTransformation<T> transformation) {
+ metadata.addOperator(uid, transformation);
+ return (F) this;
+ }
+
+ /**
+ * Write out a new or updated savepoint.
+ * @param path The path to where the savepoint should be written.
+ */
+ public final void write(String path) {
+ final Path savepointPath = new Path(path);
+
+ DataSet<OperatorState> newOperatorStates = getOperatorStates(savepointPath);
+
+ List<OperatorState> existingOperators = getExistingOperatorStates();
+
+ DataSet<OperatorState> finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators);
+
+ finalOperatorStates
+ .reduceGroup(new MergeOperatorStates(metadata))
+ .name("reduce(OperatorState)")
+ .output(new SavepointOutputFormat(savepointPath))
+ .name(path);
+ }
+
+ private List<OperatorState> getExistingOperatorStates() {
+ return metadata
+ .getOperatorStates()
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getValue().isLeft())
+ .map(entry -> entry.getValue().left())
+ .collect(Collectors.toList());
+ }
+
+ private DataSet<OperatorState> getOperatorStates(Path savepointPath) {
+ return metadata
+ .getOperatorStates()
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getValue().isRight())
+ .map(entry -> entry.getValue().right().writeOperatorState(entry.getKey(), stateBackend, metadata, savepointPath))
+ .reduce(DataSet::union)
+ .orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator"));
+ }
+
+ private DataSet<OperatorState> unionOperatorStates(DataSet<OperatorState> newOperatorStates, List<OperatorState> existingOperators) {
+ DataSet<OperatorState> finalOperatorStates;
+ if (existingOperators.isEmpty()) {
+ finalOperatorStates = newOperatorStates;
+ } else {
+ DataSet<OperatorState> wrappedCollection = newOperatorStates
+ .getExecutionEnvironment()
+ .fromCollection(existingOperators);
+
+ finalOperatorStates = newOperatorStates.union(wrappedCollection);
+ }
+ return finalOperatorStates;
+ }
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/BroadcastStateBootstrapFunction.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/BroadcastStateBootstrapFunction.java
new file mode 100644
index 0000000..5ac0d81
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/BroadcastStateBootstrapFunction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+
+/**
+ * Interface for writing elements to broadcast state.
+ *
+ * @param <IN> The type of the input.
+ */
+@PublicEvolving
+public abstract class BroadcastStateBootstrapFunction<IN> extends AbstractRichFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Writes the given value to operator state. This function is called for every record.
+ *
+ * @param value The input record.
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+ * operation to fail and may trigger recovery.
+ */
+ public abstract void processElement(IN value, Context ctx) throws Exception;
+
+ /**
+ * Context that {@link StateBootstrapFunction}'s can use for getting additional data about an input
+ * record.
+ *
+ * <p>The context is only valid for the duration of a {@link
+ * #processElement(Object, Context)} call. Do not store the context and use
+ * afterwards!
+ */
+ public interface Context {
+
+ /** Returns the current processing time. */
+ long currentProcessingTime();
+
+ /**
+ * Fetches the {@link BroadcastState} with the specified name.
+ *
+ * @param descriptor the {@link MapStateDescriptor} of the state to be fetched.
+ * @return The required {@link BroadcastState broadcast state}.
+ */
+ <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> descriptor);
+ }
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateBootstrapFunction.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateBootstrapFunction.java
new file mode 100644
index 0000000..7703a58
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateBootstrapFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimerService;
+
+/**
+ * A function that writes keyed state to a new operator.
+ *
+ * <p>For every element {@link #processElement(Object, Context)} is invoked. This can write data to
+ * state and set timers.
+ *
+ * <p><b>NOTE:</b> A {@code KeyedStateBootstrapFunction} is always a {@link
+ * org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {@link
+ * org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown
+ * methods can be implemented. See {@link
+ * org.apache.flink.api.common.functions.RichFunction#open(Configuration)})} and {@link
+ * org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param <K> Type of the keys.
+ * @param <IN> Type of the input.
+ */
+@PublicEvolving
+public abstract class KeyedStateBootstrapFunction<K, IN> extends AbstractRichFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Process one element from the input stream.
+ *
+ * <p>This function can update internal state or set timers using the {@link Context} parameter.
+ *
+ * @param value The input value.
+ * @param ctx A {@link Context} that allows querying the timestamp of the element and getting a
+ * {@link TimerService} for registering timers and querying the time. The context is only
+ * valid during the invocation of this method, do not store it.
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+ * operation to fail and may trigger recovery.
+ */
+ public abstract void processElement(IN value, Context ctx) throws Exception;
+
+ /** Information available in an invocation of {@link #processElement(Object, Context)}. */
+ public abstract class Context {
+
+ /** A {@link TimerService} for querying time and registering timers. */
+ public abstract TimerService timerService();
+
+ /** Get key of the element being processed. */
+ public abstract K getCurrentKey();
+ }
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StateBootstrapFunction.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StateBootstrapFunction.java
new file mode 100644
index 0000000..a91de22
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StateBootstrapFunction.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+/**
+ * Interface for writing elements to operator state.
+ *
+ * @param <IN> The type of the input.
+ */
+@PublicEvolving
+public abstract class StateBootstrapFunction<IN> extends AbstractRichFunction implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Writes the given value to operator state. This function is called for every record.
+ *
+ * @param value The input record.
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+ * operation to fail and may trigger recovery.
+ */
+ public abstract void processElement(IN value, Context ctx) throws Exception;
+
+ /**
+ * Context that {@link StateBootstrapFunction}'s can use for getting additional data about an input
+ * record.
+ *
+ * <p>The context is only valid for the duration of a {@link
+ * StateBootstrapFunction#processElement(Object, Context)} call. Do not store the context and use
+ * afterwards!
+ */
+ public interface Context {
+
+ /** Returns the current processing time. */
+ long currentProcessingTime();
+ }
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedOneInputStreamTaskRunner.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedOneInputStreamTaskRunner.java
new file mode 100644
index 0000000..f6d5093
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedOneInputStreamTaskRunner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.state.api.runtime.SavepointEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.util.Collector;
+
+/**
+ * A {@link RichMapPartitionFunction} that serves as the runtime for a {@link
+ * BoundedStreamTask}.
+ *
+ * <p>The task is executed processing the data in a particular partition instead of the pulling from
+ * the network stack. After all data has been processed the runner will output the {@link
+ * OperatorSubtaskState} from the snapshot of the bounded task.
+ *
+ * @param <IN> Type of the input to the partition
+ */
+@Internal
+public class BoundedOneInputStreamTaskRunner<IN> extends RichMapPartitionFunction<IN, TaggedOperatorSubtaskState> {
+ private final StreamConfig streamConfig;
+
+ private final int maxParallelism;
+
+ private transient SavepointEnvironment env;
+
+ /**
+ * Create a new {@link BoundedOneInputStreamTaskRunner}.
+ *
+ * @param streamConfig The internal configuration for the task.
+ * @param maxParallelism The max parallelism of the operator.
+ */
+ public BoundedOneInputStreamTaskRunner(
+ StreamConfig streamConfig,
+ int maxParallelism) {
+
+ this.streamConfig = streamConfig;
+ this.maxParallelism = maxParallelism;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ env = new SavepointEnvironment
+ .Builder(getRuntimeContext(), maxParallelism)
+ .setConfiguration(streamConfig.getConfiguration())
+ .build();
+ }
+
+ @Override
+ public void mapPartition(Iterable<IN> values, Collector<TaggedOperatorSubtaskState> out) throws Exception {
+ new BoundedStreamTask<>(env, values, out).invoke();
+ }
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
new file mode 100644
index 0000000..db663da
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.state.api.runtime.NeverFireProcessingTimeService;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Iterator;
+
+/**
+ * A stream task that pulls elements from an {@link Iterable} instead of the network. After all
+ * elements are processed the task takes a snapshot of the subtask operator state. This is a shim
+ * until stream tasks support bounded inputs.
+ *
+ * @param <IN> Type of the input.
+ * @param <OUT> Type of the output.
+ * @param <OP> Type of the operator this task runs.
+ */
+class BoundedStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & BoundedOneInput>
+ extends StreamTask<OUT, OP> {
+
+ private final Iterator<IN> input;
+
+ private final Collector<OUT> collector;
+
+ private final StreamRecord<IN> reuse;
+
+ BoundedStreamTask(
+ Environment environment,
+ Iterable<IN> input,
+ Collector<OUT> collector) {
+ super(environment, new NeverFireProcessingTimeService());
+ this.input = input.iterator();
+ this.collector = collector;
+ this.reuse = new StreamRecord<>(null);
+ }
+
+ @Override
+ protected void init() throws Exception {
+ Preconditions.checkState(
+ operatorChain.getAllOperators().length == 1,
+ "BoundedStreamTask's should only run a single operator");
+
+ // re-initialize the operator with the correct collector.
+ StreamOperatorFactory<OUT> operatorFactory = configuration.getStreamOperatorFactory(getUserCodeClassLoader());
+ headOperator = operatorFactory.createStreamOperator(this, configuration, new CollectorWrapper<>(collector));
+ headOperator.initializeState();
+ headOperator.open();
+ }
+
+ @Override
+ protected void performDefaultAction(ActionContext context) throws Exception {
+ if (input.hasNext()) {
+ reuse.replace(input.next());
+ headOperator.setKeyContextElement1(reuse);
+ headOperator.processElement(reuse);
+ } else {
+ headOperator.endInput();
+ context.allActionsCompleted();
+ }
+ }
+
+ @Override
+ protected void cancelTask() {}
+
+ @Override
+ protected void cleanup() throws Exception {
+ headOperator.close();
+ headOperator.dispose();
+ }
+
+ private static class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
+
+ private final Collector<OUT> inner;
+
+ private CollectorWrapper(Collector<OUT> inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) { }
+
+ @Override
+ public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { }
+
+ @Override
+ public void emitLatencyMarker(LatencyMarker latencyMarker) { }
+
+ @Override
+ public void collect(StreamRecord<OUT> record) {
+ inner.collect(record.getValue());
+ }
+
+ @Override
+ public void close() { }
+ }
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
new file mode 100644
index 0000000..f0a7793
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * A reducer that aggregates multiple {@link OperatorState}'s into a single {@link Savepoint}.
+ */
+@Internal
+public class MergeOperatorStates implements GroupReduceFunction<OperatorState, Savepoint> {
+ private final SavepointMetadata metadata;
+
+ public MergeOperatorStates(SavepointMetadata metadata) {
+ Preconditions.checkNotNull(metadata, "Savepoint metadata must not be null");
+
+ this.metadata = metadata;
+ }
+
+ @Override
+ public void reduce(Iterable<OperatorState> values, Collector<Savepoint> out) {
+ Savepoint savepoint =
+ new SavepointV2(
+ SnapshotUtils.CHECKPOINT_ID,
+ StreamSupport.stream(values.spliterator(), false).collect(Collectors.toList()),
+ metadata.getMasterStates());
+
+ out.collect(savepoint);
+ }
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java
new file mode 100644
index 0000000..bfd06b3
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * A reducer that aggregates all {@link OperatorSubtaskState}'s for a particular operator into a
+ * single {@link OperatorState}.
+ */
+@Internal
+public class OperatorSubtaskStateReducer
+ extends RichGroupReduceFunction<TaggedOperatorSubtaskState, OperatorState> {
+
+ private final OperatorID operatorID;
+
+ private final int maxParallelism;
+
+ public OperatorSubtaskStateReducer(OperatorID operatorID, int maxParallelism) {
+ Preconditions.checkNotNull(operatorID, "Operator id must not be null.");
+ Preconditions.checkState(maxParallelism > 1);
+
+ this.operatorID = operatorID;
+ this.maxParallelism = maxParallelism;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+
+ @Override
+ public void reduce(Iterable<TaggedOperatorSubtaskState> values, Collector<OperatorState> out) {
+ List<TaggedOperatorSubtaskState> subtasks = StreamSupport
+ .stream(values.spliterator(), false)
+ .collect(Collectors.toList());
+
+ OperatorState operatorState = new OperatorState(operatorID, subtasks.size(), maxParallelism);
+
+ for (TaggedOperatorSubtaskState value : subtasks) {
+ operatorState.putState(value.index, value.state);
+ }
+
+ out.collect(operatorState);
+ }
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
new file mode 100644
index 0000000..190de52
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.Checkpoints;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
+import org.apache.flink.util.LambdaUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * An output format to serialize {@link Savepoint} metadata to distributed storage.
+ *
+ * <p>This format may only be executed with parallelism 1.
+ */
+@Internal
+public class SavepointOutputFormat extends RichOutputFormat<Savepoint> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(SavepointOutputFormat.class);
+
+ private final Path savepointPath;
+
+ private transient CheckpointStorageLocation targetLocation;
+
+ public SavepointOutputFormat(Path savepointPath) {
+ this.savepointPath = savepointPath;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ Preconditions.checkState(numTasks == 1, "SavepointOutputFormat should only be executed with parallelism 1");
+
+ targetLocation = createSavepointLocation(savepointPath);
+ }
+
+ @Override
+ public void writeRecord(Savepoint savepoint) throws IOException {
+ String path = LambdaUtil.withContextClassLoader(getRuntimeContext().getUserCodeClassLoader(), () -> {
+ try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
+ Checkpoints.storeCheckpointMetadata(savepoint, out);
+ CompletedCheckpointStorageLocation finalizedLocation = out.closeAndFinalizeCheckpoint();
+ return finalizedLocation.getExternalPointer();
+ }
+ });
+
+ LOG.info("Savepoint written to " + path);
+ }
+
+ @Override
+ public void close() {}
+
+ private static CheckpointStorageLocation createSavepointLocation(Path location) throws IOException {
+ final CheckpointStorageLocationReference reference = AbstractFsCheckpointStorage.encodePathAsReference(location);
+ return new FsCheckpointStorageLocation(
+ location.getFileSystem(),
+ location,
+ location,
+ location,
+ reference,
+ CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue(),
+ CheckpointingOptions.FS_WRITE_BUFFER_SIZE.defaultValue());
+ }
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
new file mode 100644
index 0000000..69f1605
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+/**
+ * Takes a final snapshot of the state of an operator subtask.
+ */
+@Internal
+public final class SnapshotUtils {
+ static final long CHECKPOINT_ID = 0L;
+
+ private SnapshotUtils() {}
+
+ public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState snapshot(
+ OP operator,
+ int index,
+ long timestamp,
+ CheckpointStorageWorkerView checkpointStorage,
+ Path savepointPath) throws Exception {
+
+ CheckpointOptions options = new CheckpointOptions(
+ CheckpointType.SAVEPOINT,
+ AbstractFsCheckpointStorage.encodePathAsReference(savepointPath));
+
+ operator.prepareSnapshotPreBarrier(CHECKPOINT_ID);
+
+ CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
+ CHECKPOINT_ID,
+ options.getTargetLocation());
+
+ OperatorSnapshotFutures snapshotInProgress = operator.snapshotState(
+ CHECKPOINT_ID,
+ timestamp,
+ options,
+ storage);
+
+ OperatorSubtaskState state = new OperatorSnapshotFinalizer(snapshotInProgress).getJobManagerOwnedState();
+
+ operator.notifyCheckpointComplete(CHECKPOINT_ID);
+ return new TaggedOperatorSubtaskState(index, state);
+ }
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/TaggedOperatorSubtaskState.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/TaggedOperatorSubtaskState.java
new file mode 100644
index 0000000..bf9aacc
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/TaggedOperatorSubtaskState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+
+import java.util.Objects;
+
+/**
+ * A simple wrapper pojo that tags {@link OperatorSubtaskState} with metadata.
+ */
+@Internal
+@SuppressWarnings("WeakerAccess")
+public final class TaggedOperatorSubtaskState {
+
+ public int index;
+
+ public OperatorSubtaskState state;
+
+ public TaggedOperatorSubtaskState(int index, OperatorSubtaskState state) {
+ this.index = index;
+ this.state = state;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TaggedOperatorSubtaskState that = (TaggedOperatorSubtaskState) o;
+ return index == that.index &&
+ Objects.equals(state, that.state);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index, state);
+ }
+
+ @Override
+ public String toString() {
+ return "TaggedOperatorSubtaskState{" +
+ "index=" + index +
+ ", state=" + state +
+ '}';
+ }
+}
+
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
new file mode 100644
index 0000000..9c6c866
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
+import org.apache.flink.state.api.output.SnapshotUtils;
+import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.FlinkRuntimeException;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link
+ * BroadcastStateBootstrapFunction}'s.
+ */
+@Internal
+public class BroadcastStateBootstrapOperator<IN>
+ extends AbstractUdfStreamOperator<TaggedOperatorSubtaskState, BroadcastStateBootstrapFunction<IN>>
+ implements OneInputStreamOperator<IN, TaggedOperatorSubtaskState>,
+ BoundedOneInput {
+
+ private static final long serialVersionUID = 1L;
+
+ private final long timestamp;
+
+ private final Path savepointPath;
+
+ private transient ContextImpl context;
+
+ public BroadcastStateBootstrapOperator(long timestamp, Path savepointPath, BroadcastStateBootstrapFunction<IN> function) {
+ super(function);
+ this.timestamp = timestamp;
+
+ this.savepointPath = savepointPath;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ context = new ContextImpl(getProcessingTimeService());
+ }
+
+ @Override
+ public void processElement(StreamRecord<IN> element) throws Exception {
+ userFunction.processElement(element.getValue(), context);
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ TaggedOperatorSubtaskState state = SnapshotUtils.snapshot(
+ this,
+ getRuntimeContext().getIndexOfThisSubtask(),
+ timestamp,
+ getContainingTask().getCheckpointStorage(),
+ savepointPath);
+
+ output.collect(new StreamRecord<>(state));
+ }
+
+ private class ContextImpl implements BroadcastStateBootstrapFunction.Context {
+ private final ProcessingTimeService processingTimeService;
+
+ ContextImpl(ProcessingTimeService processingTimeService) {
+ this.processingTimeService = processingTimeService;
+ }
+
+ @Override
+ public long currentProcessingTime() {
+ return processingTimeService.getCurrentProcessingTime();
+ }
+
+ @Override
+ public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> descriptor) {
+ try {
+ return getOperatorStateBackend().getBroadcastState(descriptor);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+ }
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
new file mode 100644
index 0000000..53cedac
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
+import org.apache.flink.state.api.output.SnapshotUtils;
+import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
+import org.apache.flink.state.api.runtime.VoidTriggerable;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link
+ * KeyedStateBootstrapFunction}'s.
+ */
+@Internal
+public class KeyedStateBootstrapOperator<K, IN>
+ extends AbstractUdfStreamOperator<TaggedOperatorSubtaskState, KeyedStateBootstrapFunction<K, IN>>
+ implements OneInputStreamOperator<IN, TaggedOperatorSubtaskState>,
+ BoundedOneInput {
+
+ private static final long serialVersionUID = 1L;
+
+ private final long timestamp;
+
+ private final Path savepointPath;
+
+ private transient KeyedStateBootstrapOperator<K, IN>.ContextImpl context;
+
+ public KeyedStateBootstrapOperator(long timestamp, Path savepointPath, KeyedStateBootstrapFunction<K, IN> function) {
+ super(function);
+
+ this.timestamp = timestamp;
+ this.savepointPath = savepointPath;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService(
+ "user-timers",
+ VoidNamespaceSerializer.INSTANCE,
+ VoidTriggerable.instance());
+
+ TimerService timerService = new SimpleTimerService(internalTimerService);
+
+ context = new KeyedStateBootstrapOperator<K, IN>.ContextImpl(userFunction, timerService);
+ }
+
+ @Override
+ public void processElement(StreamRecord<IN> element) throws Exception {
+ userFunction.processElement(element.getValue(), context);
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ TaggedOperatorSubtaskState state = SnapshotUtils.snapshot(
+ this,
+ getRuntimeContext().getIndexOfThisSubtask(),
+ timestamp,
+ getContainingTask().getCheckpointStorage(),
+ savepointPath);
+
+ output.collect(new StreamRecord<>(state));
+ }
+
+ private class ContextImpl extends KeyedStateBootstrapFunction<K, IN>.Context {
+
+ private final TimerService timerService;
+
+ ContextImpl(KeyedStateBootstrapFunction<K, IN> function, TimerService timerService) {
+ function.super();
+ this.timerService = Preconditions.checkNotNull(timerService);
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public K getCurrentKey() {
+ return (K) KeyedStateBootstrapOperator.this.getCurrentKey();
+ }
+ }
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
new file mode 100644
index 0000000..ca78f87
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.state.api.functions.StateBootstrapFunction;
+import org.apache.flink.state.api.output.SnapshotUtils;
+import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link
+ * StateBootstrapFunction}'s.
+ */
+@Internal
+public class StateBootstrapOperator<IN>
+ extends AbstractUdfStreamOperator<TaggedOperatorSubtaskState, StateBootstrapFunction<IN>>
+ implements OneInputStreamOperator<IN, TaggedOperatorSubtaskState>,
+ BoundedOneInput {
+
+ private static final long serialVersionUID = 1L;
+
+ private final long timestamp;
+
+ private final Path savepointPath;
+
+ private transient ContextImpl context;
+
+ public StateBootstrapOperator(long timestamp, Path savepointPath, StateBootstrapFunction<IN> function) {
+ super(function);
+
+ this.timestamp = timestamp;
+ this.savepointPath = savepointPath;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ context = new ContextImpl(getProcessingTimeService());
+ }
+
+ @Override
+ public void processElement(StreamRecord<IN> element) throws Exception {
+ userFunction.processElement(element.getValue(), context);
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ TaggedOperatorSubtaskState state = SnapshotUtils.snapshot(
+ this,
+ getRuntimeContext().getIndexOfThisSubtask(),
+ timestamp,
+ getContainingTask().getCheckpointStorage(),
+ savepointPath);
+
+ output.collect(new StreamRecord<>(state));
+ }
+
+ private class ContextImpl implements StateBootstrapFunction.Context {
+ private final ProcessingTimeService processingTimeService;
+
+ ContextImpl(ProcessingTimeService processingTimeService) {
+ this.processingTimeService = processingTimeService;
+ }
+
+ @Override
+ public long currentProcessingTime() {
+ return processingTimeService.getCurrentProcessingTime();
+ }
+ }
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/partitioner/HashSelector.java
similarity index 53%
copy from flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
copy to flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/partitioner/HashSelector.java
index 3a2b7cb..1d7d4c4 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/partitioner/HashSelector.java
@@ -16,36 +16,33 @@
* limitations under the License.
*/
-package org.apache.flink.state.api.runtime.metadata;
+package org.apache.flink.state.api.output.partitioner;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.util.Preconditions;
/**
- * Returns metadata about a savepoint.
+ * A wrapper around a {@link KeySelector} that returns the {@link Object#hashCode()} of the returned
+ * key.
+ *
+ * @param <IN> Type of objects to extract the key from.
*/
@Internal
-public interface SavepointMetadata extends Serializable {
-
- /**
- * @return The max parallelism for the savepoint.
- */
- int maxParallelism();
-
- /**
- * @return Masters states for the savepoint.
- */
- Collection<MasterState> getMasterStates();
-
- /**
- * @return Operator state for the given UID.
- *
- * @throws IOException If the savepoint does not contain operator state with the given uid.
- */
- OperatorState getOperatorState(String uid) throws IOException;
+public class HashSelector<IN> implements KeySelector<IN, Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final KeySelector<IN, ?> keySelector;
+
+ public HashSelector(KeySelector<IN, ?> keySelector) {
+ Preconditions.checkNotNull(keySelector);
+ this.keySelector = keySelector;
+ }
+
+ @Override
+ public Integer getKey(IN value) throws Exception {
+ return keySelector.getKey(value).hashCode();
+ }
}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/partitioner/KeyGroupRangePartitioner.java
similarity index 53%
copy from flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
copy to flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/partitioner/KeyGroupRangePartitioner.java
index 3a2b7cb..64bffa0 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/partitioner/KeyGroupRangePartitioner.java
@@ -16,36 +16,31 @@
* limitations under the License.
*/
-package org.apache.flink.state.api.runtime.metadata;
+package org.apache.flink.state.api.output.partitioner;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
/**
- * Returns metadata about a savepoint.
+ * A partitioner that selects the target channel based on the key group index.
*/
@Internal
-public interface SavepointMetadata extends Serializable {
+public class KeyGroupRangePartitioner implements Partitioner<Integer> {
+
+ private static final long serialVersionUID = 1L;
- /**
- * @return The max parallelism for the savepoint.
- */
- int maxParallelism();
+ private final int maxParallelism;
- /**
- * @return Masters states for the savepoint.
- */
- Collection<MasterState> getMasterStates();
+ public KeyGroupRangePartitioner(int maxParallelism) {
+ this.maxParallelism = maxParallelism;
+ }
- /**
- * @return Operator state for the given UID.
- *
- * @throws IOException If the savepoint does not contain operator state with the given uid.
- */
- OperatorState getOperatorState(String uid) throws IOException;
+ @Override
+ public int partition(Integer key, int numPartitions) {
+ return KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+ maxParallelism,
+ numPartitions,
+ KeyGroupRangeAssignment.computeKeyGroupForKeyHash(key, maxParallelism));
+ }
}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/BoundedStreamConfig.java
similarity index 52%
copy from flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
copy to flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/BoundedStreamConfig.java
index 3a2b7cb..6306447 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/BoundedStreamConfig.java
@@ -16,36 +16,33 @@
* limitations under the License.
*/
-package org.apache.flink.state.api.runtime.metadata;
+package org.apache.flink.state.api.runtime;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.graph.StreamConfig;
/**
- * Returns metadata about a savepoint.
+ * A {@link StreamConfig} with default settings.
*/
@Internal
-public interface SavepointMetadata extends Serializable {
+public class BoundedStreamConfig extends StreamConfig {
+ public BoundedStreamConfig() {
+ super(new Configuration());
- /**
- * @return The max parallelism for the savepoint.
- */
- int maxParallelism();
+ setChainStart();
+ setCheckpointingEnabled(true);
+ setCheckpointMode(CheckpointingMode.EXACTLY_ONCE);
+ }
- /**
- * @return Masters states for the savepoint.
- */
- Collection<MasterState> getMasterStates();
+ public <IN> BoundedStreamConfig(TypeSerializer<?> keySerializer, KeySelector<IN, ?> keySelector) {
+ this();
- /**
- * @return Operator state for the given UID.
- *
- * @throws IOException If the savepoint does not contain operator state with the given uid.
- */
- OperatorState getOperatorState(String uid) throws IOException;
+ setStateKeySerializer(keySerializer);
+ setStatePartitioner(0, keySelector);
+ }
}
+
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
new file mode 100644
index 0000000..158fa35
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java
@@ -0,0 +1,67 @@
+package org.apache.flink.state.api.runtime.metadata;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.state.api.BootstrapTransformation;
+import org.apache.flink.state.api.runtime.OperatorIDGenerator;
+import org.apache.flink.types.Either;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Savepoint metadata that can be modified.
+ */
+@Internal
+public class ModifiableSavepointMetadata extends SavepointMetadata {
+
+ private transient Map<OperatorID, Either<OperatorState, BootstrapTransformation<?>>> operatorStateIndex;
+
+ public ModifiableSavepointMetadata(int maxParallelism, Collection<MasterState> masterStates, Collection<OperatorState> initialStates) {
+ super(maxParallelism, masterStates);
+
+ this.operatorStateIndex = new HashMap<>(initialStates.size());
+
+ for (OperatorState operatorState : initialStates) {
+ operatorStateIndex.put(operatorState.getOperatorID(), Either.Left(operatorState));
+ }
+ }
+
+ /**
+ * @return Operator state for the given UID.
+ *
+ * @throws IOException If the savepoint does not contain operator state with the given uid.
+ */
+ public OperatorState getOperatorState(String uid) throws IOException {
+ OperatorID operatorID = OperatorIDGenerator.fromUid(uid);
+
+ Either<OperatorState, BootstrapTransformation<?>> operatorState = operatorStateIndex.get(operatorID);
+ if (operatorState == null || operatorState.isRight()) {
+ throw new IOException("Savepoint does not contain state with operator uid " + uid);
+ }
+
+ return operatorState.left();
+ }
+
+ public void removeOperator(String uid) {
+ operatorStateIndex.remove(OperatorIDGenerator.fromUid(uid));
+ }
+
+ public void addOperator(String uid, BootstrapTransformation<?> transformation) {
+ OperatorID id = OperatorIDGenerator.fromUid(uid);
+
+ if (operatorStateIndex.containsKey(id)) {
+ throw new IllegalArgumentException("The savepoint already contains uid " + uid + ". All uid's must be unique");
+ }
+
+ operatorStateIndex.put(id, Either.Right(transformation));
+ }
+
+ public Map<OperatorID, Either<OperatorState, BootstrapTransformation<?>>> getOperatorStates() {
+ return operatorStateIndex;
+ }
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OnDiskSavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OnDiskSavepointMetadata.java
deleted file mode 100644
index c4010d6..0000000
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OnDiskSavepointMetadata.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.state.api.runtime.metadata;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.state.api.runtime.OperatorIDGenerator;
-import org.apache.flink.state.api.runtime.SavepointLoader;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Returns metadata about an existing savepoint.
- */
-@Internal
-public class OnDiskSavepointMetadata implements SavepointMetadata {
-
- private static final long serialVersionUID = 3623389893479485802L;
-
- private final int maxParallelism;
-
- private final Collection<MasterState> masterStates;
-
- private final Map<OperatorID, OperatorState> operatorStateIndex;
-
- public OnDiskSavepointMetadata(String path) throws IOException {
- Savepoint savepoint = SavepointLoader.loadSavepoint(path);
-
- this.maxParallelism = savepoint
- .getOperatorStates()
- .stream()
- .map(OperatorState::getMaxParallelism)
- .max(Comparator.naturalOrder())
- .orElseThrow(() -> new RuntimeException("Savepoint's must contain at least one operator"));
-
- this.masterStates = savepoint.getMasterStates();
-
- Collection<OperatorState> operatorStates = savepoint.getOperatorStates();
- this.operatorStateIndex = new HashMap<>(operatorStates.size());
- operatorStates.forEach(state -> this.operatorStateIndex.put(state.getOperatorID(), state));
- }
-
- /**
- * @return The max parallelism for the savepoint.
- */
- @Override
- public int maxParallelism() {
- return maxParallelism;
- }
-
- /**
- * @return Masters states for the savepoint.
- */
- @Override
- public Collection<MasterState> getMasterStates() {
- return masterStates;
- }
-
- /**
- * @return Operator state for the given UID.
- */
- @Override
- public OperatorState getOperatorState(String uid) throws IOException {
- OperatorID operatorID = OperatorIDGenerator.fromUid(uid);
-
- OperatorState operatorState = operatorStateIndex.get(operatorID);
- if (operatorState == null) {
- throw new IOException("Savepoint does not contain state with operator uid " + uid);
- }
-
- return operatorState;
- }
-}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
index 3a2b7cb..40ce66a 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java
@@ -20,9 +20,7 @@ package org.apache.flink.state.api.runtime.metadata;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
@@ -30,22 +28,30 @@ import java.util.Collection;
* Returns metadata about a savepoint.
*/
@Internal
-public interface SavepointMetadata extends Serializable {
+public class SavepointMetadata implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int maxParallelism;
+
+ private final Collection<MasterState> masterStates;
+
+ public SavepointMetadata(int maxParallelism, Collection<MasterState> masterStates) {
+ this.maxParallelism = maxParallelism;
+ this.masterStates = masterStates;
+ }
/**
* @return The max parallelism for the savepoint.
*/
- int maxParallelism();
+ public int maxParallelism() {
+ return maxParallelism;
+ }
/**
* @return Masters states for the savepoint.
*/
- Collection<MasterState> getMasterStates();
-
- /**
- * @return Operator state for the given UID.
- *
- * @throws IOException If the savepoint does not contain operator state with the given uid.
- */
- OperatorState getOperatorState(String uid) throws IOException;
+ public Collection<MasterState> getMasterStates() {
+ return masterStates;
+ }
}
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
new file mode 100644
index 0000000..707ccde
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.Operator;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
+import org.apache.flink.state.api.functions.StateBootstrapFunction;
+import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
+import org.apache.flink.state.api.runtime.OperatorIDGenerator;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/**
+ * Tests for bootstrap transformations.
+ */
+public class BootstrapTransformationTest extends AbstractTestBase {
+
+ @Test
+ public void testBroadcastStateTransformationParallelism() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(10);
+
+ DataSet<Integer> input = env.fromElements(0);
+
+ BootstrapTransformation<Integer> transformation = OperatorTransformation
+ .bootstrapWith(input)
+ .transform(new ExampleBroadcastStateBootstrapFunction());
+
+ int maxParallelism = transformation.getMaxParallelism(new SavepointMetadata(4, Collections.emptyList()));
+ DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
+ OperatorIDGenerator.fromUid("uid"),
+ new MemoryStateBackend(),
+ new Path(),
+ maxParallelism
+ );
+
+ Assert.assertEquals("Broadcast transformations should always be run at parallelism 1",
+ 1,
+ getParallelism(result));
+ }
+
+ @Test
+ public void testDefaultParallelismRespectedWhenLessThanMaxParallelism() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ DataSource<Integer> input = env.fromElements(0);
+
+ BootstrapTransformation<Integer> transformation = OperatorTransformation
+ .bootstrapWith(input)
+ .transform(new ExampleStateBootstrapFunction());
+
+ int maxParallelism = transformation.getMaxParallelism(new SavepointMetadata(10, Collections.emptyList()));
+ DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
+ OperatorIDGenerator.fromUid("uid"),
+ new MemoryStateBackend(),
+ new Path(),
+ maxParallelism
+ );
+
+ Assert.assertEquals(
+ "The parallelism of a data set should not change when less than the max parallelism of the savepoint",
+ ExecutionConfig.PARALLELISM_DEFAULT,
+ getParallelism(result));
+ }
+
+ @Test
+ public void testMaxParallelismRespected() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(10);
+
+ DataSource<Integer> input = env.fromElements(0);
+
+ BootstrapTransformation<Integer> transformation = OperatorTransformation
+ .bootstrapWith(input)
+ .transform(new ExampleStateBootstrapFunction());
+
+ int maxParallelism = transformation.getMaxParallelism(new SavepointMetadata(4, Collections.emptyList()));
+ DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
+ OperatorIDGenerator.fromUid("uid"),
+ new MemoryStateBackend(),
+ new Path(),
+ maxParallelism
+ );
+
+ Assert.assertEquals(
+ "The parallelism of a data set should be constrained my the savepoint max parallelism",
+ 4,
+ getParallelism(result));
+ }
+
+ @Test
+ public void testOperatorSpecificMaxParallelismRespected() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ DataSource<Integer> input = env.fromElements(0);
+
+ BootstrapTransformation<Integer> transformation = OperatorTransformation
+ .bootstrapWith(input)
+ .setMaxParallelism(1)
+ .transform(new ExampleStateBootstrapFunction());
+
+ int maxParallelism = transformation.getMaxParallelism(new SavepointMetadata(4, Collections.emptyList()));
+ DataSet<TaggedOperatorSubtaskState> result = transformation.writeOperatorSubtaskStates(
+ OperatorIDGenerator.fromUid("uid"),
+ new MemoryStateBackend(),
+ new Path(),
+ maxParallelism
+ );
+
+ Assert.assertEquals("The parallelism of a data set should be constrained my the savepoint max parallelism", 1, getParallelism(result));
+ }
+
+ private static <T> int getParallelism(DataSet<T> dataSet) {
+ //All concrete implementations of DataSet are operators so this should always be safe.
+ return ((Operator) dataSet).getParallelism();
+ }
+
+ private static class ExampleBroadcastStateBootstrapFunction extends BroadcastStateBootstrapFunction<Integer> {
+
+ @Override
+ public void processElement(Integer value, Context ctx) throws Exception {
+ }
+ }
+
+ private static class ExampleStateBootstrapFunction extends StateBootstrapFunction<Integer> {
+
+ @Override
+ public void processElement(Integer value, Context ctx) throws Exception {
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ }
+ }
+}
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java
new file mode 100644
index 0000000..7d34244
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.state.api.functions.StateBootstrapFunction;
+import org.apache.flink.state.api.runtime.OperatorIDGenerator;
+import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Tests the api of creating new savepoints.
+ */
+public class SavepointTest {
+
+ private static final String UID = "uid";
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNewSavepointEnforceUniqueUIDs() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(10);
+
+ DataSource<Integer> input = env.fromElements(0);
+
+ BootstrapTransformation<Integer> transformation = OperatorTransformation
+ .bootstrapWith(input)
+ .transform(new ExampleStateBootstrapFunction());
+
+ ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(1, Collections.emptyList(), Collections.emptyList());
+
+ new NewSavepoint(metadata, new MemoryStateBackend())
+ .withOperator(UID, transformation)
+ .withOperator(UID, transformation);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testExistingSavepointEnforceUniqueUIDs() throws IOException {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(10);
+
+ DataSource<Integer> input = env.fromElements(0);
+
+ BootstrapTransformation<Integer> transformation = OperatorTransformation
+ .bootstrapWith(input)
+ .transform(new ExampleStateBootstrapFunction());
+
+ Collection<OperatorState> operatorStates = Collections.singletonList(new OperatorState(
+ OperatorIDGenerator.fromUid(UID), 1, 4));
+
+ ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(4, Collections.emptyList(), operatorStates);
+
+ new ExistingSavepoint(env, metadata, new MemoryStateBackend())
+ .withOperator(UID, transformation)
+ .withOperator(UID, transformation);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testExistingSavepointEnforceUniqueUIDsWithOldSavepoint() throws IOException {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(10);
+
+ DataSource<Integer> input = env.fromElements(0);
+
+ BootstrapTransformation<Integer> transformation = OperatorTransformation
+ .bootstrapWith(input)
+ .transform(new ExampleStateBootstrapFunction());
+
+ Collection<OperatorState> operatorStates = Collections.singletonList(new OperatorState(
+ OperatorIDGenerator.fromUid(UID), 1, 4));
+
+ ModifiableSavepointMetadata metadata = new ModifiableSavepointMetadata(4, Collections.emptyList(), operatorStates);
+
+ new ExistingSavepoint(env, metadata, new MemoryStateBackend())
+ .withOperator(UID, transformation)
+ .write("");
+ }
+
+ private static class ExampleStateBootstrapFunction extends StateBootstrapFunction<Integer> {
+
+ @Override
+ public void processElement(Integer value, Context ctx) throws Exception {
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ }
+ }
+}
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
new file mode 100644
index 0000000..5621fac
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
+import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
+import org.apache.flink.state.api.functions.StateBootstrapFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+/**
+ * IT test for writing savepoints.
+ */
+@RunWith(value = Parameterized.class)
+public class SavepointWriterITCase extends AbstractTestBase {
+ private static final String ACCOUNT_UID = "accounts";
+
+ private static final String CURRENCY_UID = "currency";
+
+ private static final String MODIFY_UID = "numbers";
+
+ private static final MapStateDescriptor<String, Double> descriptor = new MapStateDescriptor<>(
+ "currency-rate", Types.STRING, Types.DOUBLE);
+
+ private final StateBackend backend;
+
+ private static final Collection<Account> accounts = Arrays.asList(
+ new Account(1, 100.0),
+ new Account(2, 100.0),
+ new Account(3, 100.0));
+
+ private static final Collection<CurrencyRate> currencyRates = Arrays.asList(
+ new CurrencyRate("USD", 1.0),
+ new CurrencyRate("EUR", 1.3)
+ );
+
+ public SavepointWriterITCase(StateBackend backend) throws Exception {
+ this.backend = backend;
+
+ //reset the cluster so we can change the state backend
+ miniClusterResource.after();
+ miniClusterResource.before();
+ }
+
+ @Parameterized.Parameters(name = "Savepoint Writer: {0}")
+ public static Collection<StateBackend> data() {
+ return Arrays.asList(
+ new MemoryStateBackend(),
+ new RocksDBStateBackend((StateBackend) new MemoryStateBackend()));
+ }
+
+ @Test
+ public void testStateBootstrapAndModification() throws Exception {
+ final String savepointPath = getTempDirPath(new AbstractID().toHexString());
+
+ bootstrapState(savepointPath);
+
+ validateBootstrap(savepointPath);
+
+ final String modifyPath = getTempDirPath(new AbstractID().toHexString());
+
+ modifySavepoint(savepointPath, modifyPath);
+
+ validateModification(modifyPath);
+ }
+
+ private void bootstrapState(String savepointPath) throws Exception {
+ ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Account> accountDataSet = bEnv.fromCollection(accounts);
+
+ BootstrapTransformation<Account> transformation = OperatorTransformation
+ .bootstrapWith(accountDataSet)
+ .keyBy(acc -> acc.id)
+ .transform(new AccountBootstrapper());
+
+ DataSet<CurrencyRate> currencyDataSet = bEnv.fromCollection(currencyRates);
+
+ BootstrapTransformation<CurrencyRate> broadcastTransformation = OperatorTransformation
+ .bootstrapWith(currencyDataSet)
+ .transform(new CurrencyBootstrapFunction());
+
+ Savepoint
+ .create(backend, 128)
+ .withOperator(ACCOUNT_UID, transformation)
+ .withOperator(CURRENCY_UID, broadcastTransformation)
+ .write(savepointPath);
+
+ bEnv.execute("Bootstrap");
+ }
+
+ private void validateBootstrap(String savepointPath) throws ProgramInvocationException {
+ StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ sEnv.setStateBackend(backend);
+
+ CollectSink.accountList.clear();
+
+ sEnv.fromCollection(accounts)
+ .keyBy(acc -> acc.id)
+ .flatMap(new UpdateAndGetAccount())
+ .uid(ACCOUNT_UID)
+ .addSink(new CollectSink());
+
+ sEnv
+ .fromCollection(currencyRates)
+ .connect(sEnv.fromCollection(currencyRates).broadcast(descriptor))
+ .process(new CurrencyValidationFunction())
+ .uid(CURRENCY_UID)
+ .addSink(new DiscardingSink<>());
+
+ JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false));
+
+ ClusterClient<?> client = miniClusterResource.getClusterClient();
+ client.submitJob(jobGraph, SavepointWriterITCase.class.getClassLoader());
+
+ Assert.assertEquals("Unexpected output", 3, CollectSink.accountList.size());
+ }
+
+ private void modifySavepoint(String savepointPath, String modifyPath) throws Exception {
+ ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> data = bEnv.fromElements(1, 2, 3);
+
+ BootstrapTransformation<Integer> transformation = OperatorTransformation
+ .bootstrapWith(data)
+ .transform(new ModifyProcessFunction());
+
+ Savepoint
+ .load(bEnv, savepointPath, backend)
+ .removeOperator(CURRENCY_UID)
+ .withOperator(MODIFY_UID, transformation)
+ .write(modifyPath);
+
+ bEnv.execute("Modifying");
+ }
+
+ private void validateModification(String savepointPath) throws ProgramInvocationException {
+ StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ sEnv.setStateBackend(backend);
+
+ CollectSink.accountList.clear();
+
+ DataStream<Account> stream = sEnv.fromCollection(accounts)
+ .keyBy(acc -> acc.id)
+ .flatMap(new UpdateAndGetAccount())
+ .uid(ACCOUNT_UID);
+
+ stream.addSink(new CollectSink());
+
+ stream
+ .map(acc -> acc.id)
+ .map(new StatefulOperator())
+ .uid(MODIFY_UID)
+ .addSink(new DiscardingSink<>());
+
+ JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false));
+
+ ClusterClient<?> client = miniClusterResource.getClusterClient();
+ client.submitJob(jobGraph, SavepointWriterITCase.class.getClassLoader());
+
+ Assert.assertEquals("Unexpected output", 3, CollectSink.accountList.size());
+ }
+
+ /**
+ * A simple pojo.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static class Account {
+ Account(int id, double amount) {
+ this.id = id;
+ this.amount = amount;
+ this.timestamp = 1000L;
+ }
+
+ public int id;
+
+ public double amount;
+
+ public long timestamp;
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof Account && ((Account) obj).id == id && ((Account) obj).amount == amount;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, amount);
+ }
+ }
+
+ /**
+ * A simple pojo.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static class CurrencyRate {
+ public String currency;
+
+ public Double rate;
+
+ CurrencyRate(String currency, double rate) {
+ this.currency = currency;
+ this.rate = rate;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof CurrencyRate
+ && ((CurrencyRate) obj).currency.equals(currency)
+ && ((CurrencyRate) obj).rate.equals(rate);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(currency, rate);
+ }
+ }
+
+ /**
+ * A savepoint writer function.
+ */
+ public static class AccountBootstrapper extends KeyedStateBootstrapFunction<Integer, Account> {
+ ValueState<Double> state;
+
+ @Override
+ public void open(Configuration parameters) {
+ ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("total", Types.DOUBLE);
+ state = getRuntimeContext().getState(descriptor);
+ }
+
+ @Override
+ public void processElement(Account value, Context ctx) throws Exception {
+ state.update(value.amount);
+ }
+ }
+
+ /**
+ * A streaming function bootstrapped off the state.
+ */
+ public static class UpdateAndGetAccount extends RichFlatMapFunction<Account, Account> {
+ ValueState<Double> state;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("total", Types.DOUBLE);
+ state = getRuntimeContext().getState(descriptor);
+ }
+
+ @Override
+ public void flatMap(Account value, Collector<Account> out) throws Exception {
+ Double current = state.value();
+ if (current != null) {
+ value.amount += current;
+ }
+
+ state.update(value.amount);
+ out.collect(value);
+ }
+ }
+
+ /**
+ * A bootstrap function.
+ */
+ public static class ModifyProcessFunction extends StateBootstrapFunction<Integer> {
+ List<Integer> numbers;
+
+ ListState<Integer> state;
+
+ @Override
+ public void open(Configuration parameters) {
+ numbers = new ArrayList<>();
+ }
+
+ @Override
+ public void processElement(Integer value, Context ctx) {
+ numbers.add(value);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ state.clear();
+ state.addAll(numbers);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ state = context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>("numbers", Types.INT)
+ );
+ }
+ }
+
+ /**
+ * A streaming function bootstrapped off the state.
+ */
+ public static class StatefulOperator extends RichMapFunction<Integer, Integer> implements CheckpointedFunction {
+ List<Integer> numbers;
+
+ ListState<Integer> state;
+
+ @Override
+ public void open(Configuration parameters) {
+ numbers = new ArrayList<>();
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ state.clear();
+ state.addAll(numbers);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ state = context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>("numbers", Types.INT)
+ );
+
+ if (context.isRestored()) {
+ Set<Integer> expected = new HashSet<>();
+ expected.add(1);
+ expected.add(2);
+ expected.add(3);
+
+ for (Integer number : state.get()) {
+ Assert.assertTrue("Duplicate state", expected.contains(number));
+ expected.remove(number);
+ }
+
+ Assert.assertTrue("Failed to bootstrap all state elements: " + Arrays.toString(expected.toArray()), expected.isEmpty());
+ }
+ }
+
+ @Override
+ public Integer map(Integer value) {
+ return null;
+ }
+ }
+
+ /**
+ * A broadcast bootstrap function.
+ */
+ public static class CurrencyBootstrapFunction extends BroadcastStateBootstrapFunction<CurrencyRate> {
+
+ @Override
+ public void processElement(CurrencyRate value, Context ctx) throws Exception {
+ ctx.getBroadcastState(descriptor).put(value.currency, value.rate);
+ }
+ }
+
+ /**
+ * Checks the restored broadcast state.
+ */
+ public static class CurrencyValidationFunction extends BroadcastProcessFunction<CurrencyRate, CurrencyRate, Void> {
+
+ @Override
+ public void processElement(CurrencyRate value, ReadOnlyContext ctx, Collector<Void> out) throws Exception {
+ Assert.assertEquals(
+ "Incorrect currency rate",
+ value.rate,
+ ctx.getBroadcastState(descriptor).get(value.currency),
+ 0.0001);
+ }
+
+ @Override
+ public void processBroadcastElement(CurrencyRate value, Context ctx, Collector<Void> out) {
+ //ignore
+ }
+ }
+
+ /**
+ * A simple collections sink.
+ */
+ public static class CollectSink implements SinkFunction<Account> {
+ static Set<Integer> accountList = new ConcurrentSkipListSet<>();
+
+ @Override
+ public void invoke(Account value, Context context) {
+ accountList.add(value.id);
+ }
+ }
+}
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
new file mode 100644
index 0000000..7cc4b2f
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.state.api.runtime.OperatorIDGenerator;
+import org.apache.flink.state.api.runtime.SavepointLoader;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collections;
+
+/**
+ * Test for writing output savepoint metadata.
+ */
+public class SavepointOutputFormatTest {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test(expected = IllegalStateException.class)
+ public void testSavepointOutputFormatOnlyWorksWithParallelismOne() throws Exception {
+ Path path = new Path(temporaryFolder.newFolder().getAbsolutePath());
+ SavepointOutputFormat format = createSavepointOutputFormat(path);
+
+ format.open(0, 2);
+ }
+
+ @Test
+ public void testSavepointOutputFormat() throws Exception {
+ Path path = new Path(temporaryFolder.newFolder().getAbsolutePath());
+ SavepointOutputFormat format = createSavepointOutputFormat(path);
+
+ Savepoint savepoint = createSavepoint();
+
+ format.open(0, 1);
+ format.writeRecord(savepoint);
+ format.close();
+
+ Savepoint savepointOnDisk = SavepointLoader.loadSavepoint(path.getPath());
+
+ Assert.assertEquals(
+ "Incorrect checkpoint id",
+ savepoint.getCheckpointId(),
+ savepointOnDisk.getCheckpointId());
+
+ Assert.assertEquals(
+ "Incorrect number of operator states in savepoint",
+ savepoint.getOperatorStates().size(),
+ savepointOnDisk.getOperatorStates().size());
+
+ Assert.assertEquals(
+ "Incorrect operator state in savepoint",
+ savepoint.getOperatorStates().iterator().next(),
+ savepointOnDisk.getOperatorStates().iterator().next());
+ }
+
+ private SavepointV2 createSavepoint() {
+ OperatorState operatorState = new OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 128);
+
+ operatorState.putState(0, new OperatorSubtaskState());
+ return new SavepointV2(0, Collections.singleton(operatorState), Collections.emptyList());
+ }
+
+ private SavepointOutputFormat createSavepointOutputFormat(Path path) throws Exception {
+ RuntimeContext ctx = new MockStreamingRuntimeContext(false, 1, 0);
+
+ SavepointOutputFormat format = new SavepointOutputFormat(path);
+ format.setRuntimeContext(ctx);
+
+ return format;
+ }
+}
+
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
new file mode 100644
index 0000000..01bcbbc
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests that snapshot utils can properly snapshot an operator.
+ */
+public class SnapshotUtilsTest {
+
+ private static final List<String> EXPECTED_CALL_OPERATOR_SNAPSHOT = Arrays.asList(
+ "prepareSnapshotPreBarrier",
+ "snapshotState",
+ "notifyCheckpointComplete");
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ private static final List<String> ACTUAL_ORDER_TRACKING =
+ Collections.synchronizedList(new ArrayList<>(EXPECTED_CALL_OPERATOR_SNAPSHOT.size()));
+
+ @Test
+ public void testSnapshotUtilsLifecycle() throws Exception {
+ StreamOperator<Void> operator = new LifecycleOperator();
+ CheckpointStorageWorkerView storage = new MockStateBackend().createCheckpointStorage(new JobID());
+
+ Path path = new Path(folder.newFolder().getAbsolutePath());
+
+ SnapshotUtils.snapshot(operator, 0, 0L, storage, path);
+
+ Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, ACTUAL_ORDER_TRACKING);
+ }
+
+ private static class LifecycleOperator implements StreamOperator<Void> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void open() throws Exception {
+ ACTUAL_ORDER_TRACKING.add("open");
+ }
+
+ @Override
+ public void close() throws Exception {
+ ACTUAL_ORDER_TRACKING.add("close");
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ ACTUAL_ORDER_TRACKING.add("dispose");
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ ACTUAL_ORDER_TRACKING.add("prepareSnapshotPreBarrier");
+ }
+
+ @Override
+ public OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation) throws Exception {
+ ACTUAL_ORDER_TRACKING.add("snapshotState");
+ return new OperatorSnapshotFutures();
+ }
+
+ @Override
+ public void initializeState() throws Exception {
+ ACTUAL_ORDER_TRACKING.add("initializeState");
+ }
+
+ @Override
+ public void setKeyContextElement1(StreamRecord<?> record) throws Exception {
+ ACTUAL_ORDER_TRACKING.add("setKeyContextElement1");
+ }
+
+ @Override
+ public void setKeyContextElement2(StreamRecord<?> record) throws Exception {
+ ACTUAL_ORDER_TRACKING.add("setKeyContextElement2");
+ }
+
+ @Override
+ public ChainingStrategy getChainingStrategy() {
+ ACTUAL_ORDER_TRACKING.add("getChainingStrategy");
+ return null;
+ }
+
+ @Override
+ public void setChainingStrategy(ChainingStrategy strategy) {
+ ACTUAL_ORDER_TRACKING.add("setChainingStrategy");
+ }
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ ACTUAL_ORDER_TRACKING.add("getMetricGroup");
+ return null;
+ }
+
+ @Override
+ public OperatorID getOperatorID() {
+ ACTUAL_ORDER_TRACKING.add("getOperatorID");
+ return null;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ ACTUAL_ORDER_TRACKING.add("notifyCheckpointComplete");
+ }
+
+ @Override
+ public void setCurrentKey(Object key) {
+ ACTUAL_ORDER_TRACKING.add("setCurrentKey");
+ }
+
+ @Override
+ public Object getCurrentKey() {
+ ACTUAL_ORDER_TRACKING.add("getCurrentKey");
+ return null;
+ }
+ }
+}