You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/07/04 07:10:40 UTC

[flink] 09/11: [FLINK-12693] [state-processor] Improve Javadocs for user-facing APIs

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c9815a7fe6d8a9facbc062457fb1ce525d6df427
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Jul 4 12:27:38 2019 +0800

    [FLINK-12693] [state-processor] Improve Javadocs for user-facing APIs
    
    This commit improves Javadocs for the State Processor API that are
    either outdated, or not sufficiently informative.
---
 .../flink/state/api/BootstrapTransformation.java   | 17 +++++++++++--
 .../apache/flink/state/api/ExistingSavepoint.java  | 19 ++++++++++++++-
 .../state/api/KeyedOperatorTransformation.java     |  5 ++++
 .../org/apache/flink/state/api/NewSavepoint.java   |  5 +++-
 .../state/api/OneInputOperatorTransformation.java  |  4 +++-
 .../flink/state/api/OperatorTransformation.java    | 28 +++++++++++++++++++++-
 .../java/org/apache/flink/state/api/Savepoint.java |  6 +++--
 .../state/api/SavepointWriterOperatorFactory.java  | 13 +++++++++-
 .../apache/flink/state/api/WritableSavepoint.java  |  8 ++++++-
 9 files changed, 95 insertions(+), 10 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
index c5ee559..a5163f3 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
@@ -46,22 +46,35 @@ import javax.annotation.Nullable;
 import java.util.OptionalInt;
 
 /**
- * Bootstrapped data that can be written into a {@code Savepoint}.
+ * A {@code BootstrapTransformation} represents a procedure of writing new operator state into a {@code Savepoint}.
+ * It is defined by a {@code DataSet} containing the data to bootstrap with, a factory for a stream operator
+ * that consumes the elements of the {@code DataSet} and generates state to be snapshotted, as well as an optional
+ * key selector if the new operator state is partitioned.
+ *
+ * @see OperatorTransformation
+ * @see OneInputOperatorTransformation
+ *
  * @param <T> The input type of the transformation.
  */
 @PublicEvolving
 @SuppressWarnings("WeakerAccess")
 public class BootstrapTransformation<T> {
+
+	/** The data set containing the data to bootstrap the operator state with. */
 	private final DataSet<T> dataSet;
 
+	/** Factory for the {@link StreamOperator} to consume and snapshot the bootstrapping data set. */
 	private final SavepointWriterOperatorFactory factory;
 
+	/** Partitioner for the bootstrapping data set. Only relevant if this bootstraps partitioned state. */
 	@Nullable
 	private final HashSelector<T> keySelector;
 
+	/** Type information for the key of the bootstrapped state. Only relevant if this bootstraps partitioned state. */
 	@Nullable
 	private final TypeInformation<?> keyType;
 
+	/** Local max parallelism for the bootstrapped operator. */
 	private final OptionalInt operatorMaxParallelism;
 
 	BootstrapTransformation(
@@ -134,7 +147,7 @@ public class BootstrapTransformation<T> {
 			config = new BoundedStreamConfig(keySerializer, keySelector);
 		}
 
-		StreamOperator<TaggedOperatorSubtaskState> operator = factory.getOperator(
+		StreamOperator<TaggedOperatorSubtaskState> operator = factory.createOperator(
 			System.currentTimeMillis(),
 			savepointPath);
 
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
index e82b579..de6205e 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
@@ -43,15 +43,32 @@ import org.apache.flink.util.Preconditions;
 import java.io.IOException;
 
 /**
- * An existing savepoint.
+ * An existing savepoint. This class provides the entry points for reading previous
+ * existing operator states in savepoints. Operator states can be removed
+ * from and added to the set of existing operator states, and eventually, written to
+ * distributed storage as a new savepoint.
+ *
+ * <p>New savepoints written using this class are based on the previous existing savepoint.
+ * This means that for existing operators that remain untouched, the new savepoint only contains
+ * a shallow copy of pointers to state data that resides in the previous existing savepoint paths.
+ * This means that both savepoints share state and one cannot be deleted without corrupting the other!
+ *
+ * @see WritableSavepoint
  */
 @PublicEvolving
 @SuppressWarnings("WeakerAccess")
 public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
+
+	/** The batch execution environment. Used for creating inputs for reading state. */
 	private final ExecutionEnvironment env;
 
+	/** The savepoint metadata, which maintains the current set of existing / newly added operator states. */
 	private final SavepointMetadata metadata;
 
+	/**
+	 * The state backend that was previously used to write existing operator states in this savepoint.
+	 * This is also the state backend that will be used when writing again this existing savepoint.
+	 */
 	private final StateBackend stateBackend;
 
 	ExistingSavepoint(ExecutionEnvironment env, SavepointMetadata metadata, StateBackend stateBackend) throws IOException {
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java
index 49825cf..d2cde56 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java
@@ -37,12 +37,17 @@ import java.util.OptionalInt;
 @PublicEvolving
 @SuppressWarnings("WeakerAccess")
 public class KeyedOperatorTransformation<K, T> {
+
+	/** The data set containing the data to bootstrap the operator state with. */
 	private final DataSet<T> dataSet;
 
+	/** Local max parallelism for the bootstrapped operator. */
 	private final OptionalInt operatorMaxParallelism;
 
+	/** Partitioner for the bootstrapping data set. */
 	private final KeySelector<T, K> keySelector;
 
+	/** Type information for the key of the bootstrapped operator. */
 	private final TypeInformation<K> keyType;
 
 	KeyedOperatorTransformation(
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
index dc371eb..a5f8d8e 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/NewSavepoint.java
@@ -23,7 +23,10 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
 
 /**
- * A new savepoint.
+ * A new savepoint. Operator states can be removed from and added to the savepoint, and eventually, written to
+ * distributed storage as a new savepoint.
+ *
+ * @see WritableSavepoint
  */
 @PublicEvolving
 public class NewSavepoint extends WritableSavepoint<NewSavepoint> {
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java
index 687eced..62ddd77 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java
@@ -44,15 +44,17 @@ import java.util.OptionalInt;
 @PublicEvolving
 @SuppressWarnings("WeakerAccess")
 public class OneInputOperatorTransformation<T> {
+
+	/** The data set containing the data to bootstrap the operator state with. */
 	private final DataSet<T> dataSet;
 
+	/** Local max parallelism for the bootstrapped operator. */
 	private OptionalInt operatorMaxParallelism = OptionalInt.empty();
 
 	OneInputOperatorTransformation(DataSet<T> dataSet) {
 		this.dataSet = dataSet;
 	}
 
-
 	/**
 	 * Sets the maximum parallelism of this operator.
 	 *
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
index 7bba046..983f94e 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
@@ -22,7 +22,33 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.DataSet;
 
 /**
- * An OperatorTransformation represents a single operator within a {@link Savepoint}.
+ * This class provides the entry point for building {@link BootstrapTransformation}s,
+ * which represents procedures to bootstrap new operator states with a given {@code DataSet}.
+ *
+ * <h2>Example usage</h2>
+ *
+ * <pre>{@code
+ *   DataSet<StateData> stateData = ...;
+ *
+ *   // to bootstrap non-keyed state:
+ *   BootstrapTransformation<StateData> nonKeyedStateBootstrap = OperatorTransformation
+ *       .bootstrapWith(stateData)
+ *       .transform(new StateBootstrapFunction<StateData>() {...})
+ *
+ *   // to bootstrap keyed state:
+ *   BootstrapTransformation<StateData> keyedStateBootstrap = OperatorTransformation
+ *       .bootstrapWith(stateData)
+ *       .keyBy(new KeySelector<StateData, KeyType>() {...})
+ *       .transform(new KeyedStateBootstrapFunction<KeyType, StateData>() {...})
+ * }</pre>
+ *
+ * <p>The code example above demonstrates how to create {@code BootstrapTransformation}s for non-keyed and keyed
+ * state. The built bootstrap transformations can then be registered with your {@link ExistingSavepoint} or {@link Savepoint}
+ * prior to writing it.
+ *
+ * @see OneInputOperatorTransformation
+ * @see KeyedOperatorTransformation
+ * @see BootstrapTransformation
  */
 @PublicEvolving
 @SuppressWarnings("WeakerAccess")
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
index a90d810..96e4005 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java
@@ -33,8 +33,10 @@ import java.util.Comparator;
 import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
 
 /**
- * A {@link Savepoint} is a collection of operator states that can be used to supply initial state
- * when starting a {@link org.apache.flink.streaming.api.datastream.DataStream} job.
+ * This class provides entry points for loading an existing savepoint, or a new empty savepoint.
+ *
+ * @see ExistingSavepoint
+ * @see NewSavepoint
  */
 @PublicEvolving
 public final class Savepoint {
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriterOperatorFactory.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriterOperatorFactory.java
index a2b3a60..fc78168 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriterOperatorFactory.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriterOperatorFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.state.api;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -25,7 +26,17 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 /**
  * Creates a savepoint writing operator from a savepoint path.
  */
+@PublicEvolving
 @FunctionalInterface
 public interface SavepointWriterOperatorFactory {
-	StreamOperator<TaggedOperatorSubtaskState> getOperator(long timestamp, Path savepointPath);
+
+	/**
+	 * Creates a {@link StreamOperator} to be used for generating and snapshotting state.
+	 *
+	 * @param savepointTimestamp the timestamp to associate with the generated savepoint.
+	 * @param savepointPath the path to write the savepoint to.
+	 *
+	 * @return a stream operator for writing the savepoint.
+	 */
+	StreamOperator<TaggedOperatorSubtaskState> createOperator(long savepointTimestamp, Path savepointPath);
 }
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
index fb60779..cd24e63 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
@@ -31,15 +31,21 @@ import org.apache.flink.util.Preconditions;
 import java.util.List;
 
 /**
- * Any savepoint that can be written to from a batch context.
+ * A {@code WritableSavepoint} is any savepoint that can be written to from a batch context.
+ * Internally, a {@link SavepointMetadata} object is maintained that keeps track of the set
+ * of existing operator states in the savepoint, as well as newly added operator states defined by their
+ * {@link BootstrapTransformation}.
+ *
  * @param <F> The implementation type.
  */
 @PublicEvolving
 @SuppressWarnings("WeakerAccess")
 public abstract class WritableSavepoint<F extends WritableSavepoint> {
 
+	/** The savepoint metadata, which maintains the current set of existing / newly added operator states. */
 	protected final SavepointMetadata metadata;
 
+	/** The state backend to use when writing this savepoint. */
 	protected final StateBackend stateBackend;
 
 	WritableSavepoint(SavepointMetadata metadata, StateBackend stateBackend) {