You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/06/03 13:06:27 UTC

[flink] branch master updated (127bb48 -> a3250c6)

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 127bb48  [FLINK-17774][table] Supports all kinds of changes for select result
     new dc61816  [FLINK-17376] Use JavaSerializer instead of getSerializableListState()
     new 4ff270e  [FLINK-17376] Don't restore from Flink <= 1.2 state in Kafka connector
     new 276332e  [FLINK-17376] Remove deprecated state access methods
     new a3250c6  [FLINK-18032] Remove outdated sections in migration guide

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/migration.md                              | 456 +--------------------
 docs/dev/migration.zh.md                           | 456 +--------------------
 docs/dev/stream/state/state.md                     |  10 +-
 docs/dev/stream/state/state.zh.md                  |   9 +-
 .../connectors/fs/bucketing/BucketingSink.java     |   8 +-
 .../fs/bucketing/BucketingSinkMigrationTest.java   |   8 +-
 .../connectors/kafka/FlinkKafkaConsumerBase.java   |  40 +-
 .../kafka/FlinkKafkaConsumerBaseMigrationTest.java |  38 --
 .../kafka/FlinkKafkaConsumerBaseTest.java          |  13 -
 ...er-migration-test-flink1.3-empty-state-snapshot | Bin 473 -> 0 bytes
 ...kafka-consumer-migration-test-flink1.3-snapshot | Bin 1255 -> 0 bytes
 .../connectors/rabbitmq/RMQSourceTest.java         |   3 +-
 .../flink/api/common/functions/RuntimeContext.java |  46 ---
 .../functions/util/AbstractRuntimeUDFContext.java  |  10 -
 .../flink/api/common/state/OperatorStateStore.java |  38 --
 .../flink/cep/operator/CepRuntimeContext.java      |   7 -
 .../flink/cep/operator/CepRuntimeContextTest.java  |  13 -
 .../state/api/runtime/SavepointRuntimeContext.java |  13 -
 .../runtime/state/DefaultOperatorStateBackend.java |  25 --
 .../runtime/state/OperatorStateBackendTest.java    |   4 +-
 .../api/functions/async/RichAsyncFunction.java     |   7 -
 .../source/ContinuousFileReaderOperator.java       |   8 +-
 .../source/MessageAcknowledgingSourceBase.java     |   8 +-
 .../api/operators/StreamingRuntimeContext.java     |   9 -
 .../runtime/operators/GenericWriteAheadSink.java   |  11 +-
 .../util/functions/StreamingFunctionUtils.java     |  20 +-
 .../api/functions/async/RichAsyncFunctionTest.java |  13 -
 .../api/operators/StreamingRuntimeContextTest.java |  28 --
 .../collect/utils/MockOperatorStateStore.java      |  11 -
 .../StatefulJobSavepointMigrationITCase.scala      |   2 +-
 ...StatefulJobWBroadcastStateMigrationITCase.scala |   6 +-
 31 files changed, 77 insertions(+), 1243 deletions(-)
 delete mode 100644 flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot
 delete mode 100644 flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot


[flink] 02/04: [FLINK-17376] Don't restore from Flink <= 1.2 state in Kafka connector

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ff270e1d977cfcd6db1804ae16fc9002184fab3
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu May 28 15:11:27 2020 +0200

    [FLINK-17376] Don't restore from Flink <= 1.2 state in Kafka connector
    
    This code was using the deprecated getSerializedListState(), which we
    want to remove.
---
 .../connectors/kafka/FlinkKafkaConsumerBase.java   |  40 ++++-----------------
 .../kafka/FlinkKafkaConsumerBaseMigrationTest.java |  38 --------------------
 ...er-migration-test-flink1.3-empty-state-snapshot | Bin 473 -> 0 bytes
 ...kafka-consumer-migration-test-flink1.3-snapshot | Bin 1255 -> 0 bytes
 4 files changed, 6 insertions(+), 72 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index e8c0ae4..0e8c261 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -36,7 +36,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -193,12 +192,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	/** Accessor for state in the operator state backend. */
 	private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
 
-	/**
-	 * Flag indicating whether the consumer is restored from older state written with Flink 1.1 or 1.2.
-	 * When the current run is restored from older state, partition discovery is disabled.
-	 */
-	private boolean restoredFromOldState;
-
 	/** Discovery loop, executed in a separate thread. */
 	private transient volatile Thread discoveryLoopThread;
 
@@ -566,17 +559,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			}
 
 			for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
-				if (!restoredFromOldState) {
-					// seed the partition discoverer with the union state while filtering out
-					// restored partitions that should not be subscribed by this subtask
-					if (KafkaTopicPartitionAssigner.assign(
-						restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
-							== getRuntimeContext().getIndexOfThisSubtask()){
-						subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
-					}
-				} else {
-					// when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
-					// in this case, just use the restored state as the subscribed partitions
+				// seed the partition discoverer with the union state while filtering out
+				// restored partitions that should not be subscribed by this subtask
+				if (KafkaTopicPartitionAssigner.assign(
+					restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
+						== getRuntimeContext().getIndexOfThisSubtask()){
 					subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
 				}
 			}
@@ -907,27 +894,12 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 		OperatorStateStore stateStore = context.getOperatorStateStore();
 
-		ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
-			stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
-
 		this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,
 			createStateSerializer(getRuntimeContext().getExecutionConfig())));
 
-		if (context.isRestored() && !restoredFromOldState) {
+		if (context.isRestored()) {
 			restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
 
-			// migrate from 1.2 state, if there is any
-			for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
-				restoredFromOldState = true;
-				unionOffsetStates.add(kafkaOffset);
-			}
-			oldRoundRobinListState.clear();
-
-			if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
-				throw new IllegalArgumentException(
-					"Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
-			}
-
 			// populate actual holder for restored state
 			for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
 				restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 2dfe1af..79fa356 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -38,7 +38,6 @@ import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.SerializedValue;
 
-import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -57,8 +56,6 @@ import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.doAnswer;
@@ -99,7 +96,6 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 	@Parameterized.Parameters(name = "Migration Savepoint: {0}")
 	public static Collection<MigrationVersion> parameters () {
 		return Arrays.asList(
-			MigrationVersion.v1_3,
 			MigrationVersion.v1_4,
 			MigrationVersion.v1_5,
 			MigrationVersion.v1_6,
@@ -325,40 +321,6 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		consumerOperator.cancel();
 	}
 
-	/**
-	 * Test restoring from savepoints before version Flink 1.3 should fail if discovery is enabled.
-	 */
-	@Test
-	public void testRestoreFailsWithNonEmptyPreFlink13StatesIfDiscoveryEnabled() throws Exception {
-		assumeTrue(testMigrateVersion == MigrationVersion.v1_3);
-
-		final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());
-
-		final DummyFlinkKafkaConsumer<String> consumerFunction =
-			new DummyFlinkKafkaConsumer<>(TOPICS, partitions, 1000L); // discovery enabled
-
-		StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
-			new StreamSource<>(consumerFunction);
-
-		final AbstractStreamOperatorTestHarness<String> testHarness =
-			new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
-
-		testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-		testHarness.setup();
-
-		// restore state from binary snapshot file; should fail since discovery is enabled
-		try {
-			testHarness.initializeState(
-				OperatorSnapshotUtil.getResourceFilename(
-					"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
-
-			fail("Restore from savepoints from version before Flink 1.3.x should have failed if discovery is enabled.");
-		} catch (Exception e) {
-			Assert.assertTrue(e instanceof IllegalArgumentException);
-		}
-	}
-
 	// ------------------------------------------------------------------------
 
 	private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot
deleted file mode 100644
index 1a5aad1..0000000
Binary files a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot and /dev/null differ
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot
deleted file mode 100644
index dc820ef..0000000
Binary files a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot and /dev/null differ


[flink] 03/04: [FLINK-17376] Remove deprecated state access methods

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 276332e11277e7e50789e4eaed0f11ae59a0f8d1
Author: Ghildiyal <ma...@here.com>
AuthorDate: Sat May 16 16:48:38 2020 +0530

    [FLINK-17376] Remove deprecated state access methods
---
 docs/dev/stream/state/state.md                     | 10 +----
 docs/dev/stream/state/state.zh.md                  |  9 +----
 .../kafka/FlinkKafkaConsumerBaseTest.java          | 13 ------
 .../connectors/rabbitmq/RMQSourceTest.java         |  3 +-
 .../flink/api/common/functions/RuntimeContext.java | 46 ----------------------
 .../functions/util/AbstractRuntimeUDFContext.java  | 10 -----
 .../flink/api/common/state/OperatorStateStore.java | 38 ------------------
 .../flink/cep/operator/CepRuntimeContext.java      |  7 ----
 .../flink/cep/operator/CepRuntimeContextTest.java  | 13 ------
 .../state/api/runtime/SavepointRuntimeContext.java | 13 ------
 .../runtime/state/DefaultOperatorStateBackend.java | 25 ------------
 .../runtime/state/OperatorStateBackendTest.java    |  4 +-
 .../api/functions/async/RichAsyncFunction.java     |  7 ----
 .../api/operators/StreamingRuntimeContext.java     |  9 -----
 .../api/functions/async/RichAsyncFunctionTest.java | 13 ------
 .../api/operators/StreamingRuntimeContextTest.java | 28 -------------
 .../collect/utils/MockOperatorStateStore.java      | 11 ------
 .../StatefulJobSavepointMigrationITCase.scala      |  2 +-
 ...StatefulJobWBroadcastStateMigrationITCase.scala |  6 +--
 19 files changed, 10 insertions(+), 257 deletions(-)

diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index 728beab..389a03e 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -115,11 +115,6 @@ added to the state. Contrary to `ReducingState`, the aggregate type may be diffe
 of elements that are added to the state. The interface is the same as for `ListState` but elements
 added using `add(IN)` are aggregated using a specified `AggregateFunction`.
 
-* `FoldingState<T, ACC>`: This keeps a single value that represents the aggregation of all values
-added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type
-of elements that are added to the state. The interface is similar to `ListState` but elements
-added using `add(T)` are folded into an aggregate using a specified `FoldFunction`.
-
 * `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and
 retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or
 `putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable
@@ -129,8 +124,6 @@ You can also use `isEmpty()` to check whether this map contains any key-value ma
 All types of state also have a method `clear()` that clears the state for the currently
 active key, i.e. the key of the input element.
 
-<span class="label label-danger">Attention</span> `FoldingState` and `FoldingStateDescriptor` have been deprecated in Flink 1.4 and will be completely removed in the future. Please use `AggregatingState` and `AggregatingStateDescriptor` instead.
-
 It is important to keep in mind that these state objects are only used for interfacing
 with state. The state is not necessarily stored inside but might reside on disk or somewhere else.
 The second thing to keep in mind is that the value you get from the state
@@ -142,7 +135,7 @@ To get a state handle, you have to create a `StateDescriptor`. This holds the na
 that you can reference them), the type of the values that the state holds, and possibly
 a user-specified function, such as a `ReduceFunction`. Depending on what type of state you
 want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor`,
-a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a `MapStateDescriptor`.
+a `ReducingStateDescriptor`, or a `MapStateDescriptor`.
 
 State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*.
 Please see [here]({% link dev/user_defined_functions.md %}#rich-functions) for
@@ -153,7 +146,6 @@ is available in a `RichFunction` has these methods for accessing state:
 * `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
 * `ListState<T> getListState(ListStateDescriptor<T>)`
 * `AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)`
-* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
 * `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`
 
 This is an example `FlatMapFunction` that shows how all of the parts fit together:
diff --git a/docs/dev/stream/state/state.zh.md b/docs/dev/stream/state/state.zh.md
index 5fe4825..16e5e96 100644
--- a/docs/dev/stream/state/state.zh.md
+++ b/docs/dev/stream/state/state.zh.md
@@ -107,23 +107,17 @@ keyed state 接口提供不同类型状态的访问接口,这些状态都作
 * `AggregatingState<IN, OUT>`: 保留一个单值,表示添加到状态的所有值的聚合。和 `ReducingState` 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。
 接口与 `ListState` 类似,但使用 `add(IN)` 添加的元素会用指定的 `AggregateFunction` 进行聚合。
 
-* `FoldingState<T, ACC>`: 保留一个单值,表示添加到状态的所有值的聚合。 与 `ReducingState` 相反,聚合类型可能与添加到状态的元素类型不同。 
-接口与 `ListState` 类似,但使用`add(T)`添加的元素会用指定的 `FoldFunction` 折叠成聚合值。
-
 * `MapState<UK, UV>`: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 `put(UK,UV)` 或者 `putAll(Map<UK,UV>)` 添加映射。
  使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 分别检索映射、键和值的可迭代视图。你还可以通过 `isEmpty()` 来判断是否包含任何键值对。
 
 所有类型的状态还有一个`clear()` 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。
 
-<span class="label label-danger">注意</span> `FoldingState` 和 `FoldingStateDescriptor` 从 Flink 1.4 开始就已经被启用,将会在未来被删除。
-作为替代请使用 `AggregatingState` 和 `AggregatingStateDescriptor`。
-
 请牢记,这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。
 另外需要牢记的是从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。
 
 你必须创建一个 `StateDescriptor`,才能得到对应的状态句柄。 这保存了状态名称(正如我们稍后将看到的,你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们),
 状态所持有值的类型,并且可能包含用户指定的函数,例如`ReduceFunction`。 根据不同的状态类型,可以创建`ValueStateDescriptor`,`ListStateDescriptor`,
-`ReducingStateDescriptor`,`FoldingStateDescriptor` 或 `MapStateDescriptor`。
+`ReducingStateDescriptor` 或 `MapStateDescriptor`。
 
 状态通过 `RuntimeContext` 进行访问,因此只能在 *rich functions* 中使用。请参阅[这里]({% link dev/user_defined_functions.zh.md %}#rich-functions)获取相关信息,
 但是我们很快也会看到一个例子。`RichFunction` 中 `RuntimeContext` 提供如下方法:
@@ -132,7 +126,6 @@ keyed state 接口提供不同类型状态的访问接口,这些状态都作
 * `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
 * `ListState<T> getListState(ListStateDescriptor<T>)`
 * `AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)`
-* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
 * `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`
 
 下面是一个 `FlatMapFunction` 的例子,展示了如何将这些部分组合起来:
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 23de3cc..2501109 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -1389,19 +1389,6 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
 		}
 
 		@Override
-		public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
-			// return empty state for the legacy 1.2 Kafka consumer state
-			return new TestingListState<>();
-		}
-
-		// ------------------------------------------------------------------------
-
-		@Override
-		public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
 		public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception {
 			throw new UnsupportedOperationException();
 		}
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index b53723c..bb9ccf8 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -97,7 +98,7 @@ public class RMQSourceTest {
 		OperatorStateStore mockStore = Mockito.mock(OperatorStateStore.class);
 		FunctionInitializationContext mockContext = Mockito.mock(FunctionInitializationContext.class);
 		Mockito.when(mockContext.getOperatorStateStore()).thenReturn(mockStore);
-		Mockito.when(mockStore.getSerializableListState(any(String.class))).thenReturn(null);
+		Mockito.when(mockStore.getListState(any(ListStateDescriptor.class))).thenReturn(null);
 		return mockContext;
 	}
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 18c6e5a..89e4bc1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -30,8 +30,6 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
@@ -415,50 +413,6 @@ public interface RuntimeContext {
 	<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);
 
 	/**
-	 * Gets a handle to the system's key/value folding state. This state is similar to the state
-	 * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
-	 * aggregates values with different types.
-	 *
-	 * <p>This state is only accessible if the function is executed on a KeyedStream.
-	 *
-	 * <pre>{@code
-	 * DataStream<MyType> stream = ...;
-	 * KeyedStream<MyType> keyedStream = stream.keyBy("id");
-	 *
-	 * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
-	 *
-	 *     private FoldingState<MyType, Long> state;
-	 *
-	 *     public void open(Configuration cfg) {
-	 *         state = getRuntimeContext().getFoldingState(
-	 *                 new FoldingStateDescriptor<>("sum", 0L, (a, b) -> a.count() + b, Long.class));
-	 *     }
-	 *
-	 *     public Tuple2<MyType, Long> map(MyType value) {
-	 *         state.add(value);
-	 *         return new Tuple2<>(value, state.get());
-	 *     }
-	 * });
-	 *
-	 * }</pre>
-	 *
-	 * @param stateProperties The descriptor defining the properties of the stats.
-	 *
-	 * @param <T> Type of the values folded in the other state
-	 * @param <ACC> Type of the value in the state
-	 *
-	 * @return The partitioned state object.
-	 *
-	 * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
-	 *                                       function (function is not part of a KeyedStream).
-	 *
-	 * @deprecated will be removed in a future version in favor of {@link AggregatingState}
-	 */
-	@PublicEvolving
-	@Deprecated
-	<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
-
-	/**
 	 * Gets a handle to the system's key/value map state. This state is similar to the state
 	 * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
 	 * is composed of user-defined key-value pairs
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index f63c45e..f73b17f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -33,8 +33,6 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
@@ -228,14 +226,6 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 
 	@Override
 	@PublicEvolving
-	@Deprecated
-	public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
-		throw new UnsupportedOperationException(
-				"This state is only accessible by functions executed on a KeyedStream");
-	}
-
-	@Override
-	@PublicEvolving
 	public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
 		throw new UnsupportedOperationException(
 				"This state is only accessible by functions executed on a KeyedStream");
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
index 7a998e6..aadbb23 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.common.state;
 
 import org.apache.flink.annotation.PublicEvolving;
 
-import java.io.Serializable;
 import java.util.Set;
 
 /**
@@ -111,41 +110,4 @@ public interface OperatorStateStore {
 	 * @return set of names for all registered broadcast states.
 	 */
 	Set<String> getRegisteredBroadcastStateNames();
-
-	// -------------------------------------------------------------------------------------------
-	//  Deprecated methods
-	// -------------------------------------------------------------------------------------------
-
-	/**
-	 * Creates (or restores) a list state. Each state is registered under a unique name.
-	 * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).
-	 *
-	 * <p>The items in the list are repartitionable by the system in case of changed operator parallelism.
-	 *
-	 * @param stateDescriptor The descriptor for this state, providing a name and serializer.
-	 * @param <S> The generic type of the state
-	 *
-	 * @return A list for all state partitions.
-	 *
-	 * @deprecated since 1.3.0. This was deprecated as part of a refinement to the function names.
-	 *             Please use {@link #getListState(ListStateDescriptor)} instead.
-	 */
-	@Deprecated
-	<S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception;
-
-	/**
-	 * Creates a state of the given name that uses Java serialization to persist the state. The items in the list
-	 * are repartitionable by the system in case of changed operator parallelism.
-	 *
-	 * <p>This is a simple convenience method. For more flexibility on how state serialization
-	 * should happen, use the {@link #getListState(ListStateDescriptor)} method.
-	 *
-	 * @param stateName The name of state to create
-	 * @return A list state using Java serialization to serialize state objects.
-	 *
-	 * @deprecated since 1.3.0. Using Java serialization for persisting state is not encouraged.
-	 *             Please use {@link #getListState(ListStateDescriptor)} instead.
-	 */
-	@Deprecated
-	<T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception;
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
index af7ca0c..6c6ccc3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
@@ -31,8 +31,6 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
@@ -202,11 +200,6 @@ class CepRuntimeContext implements RuntimeContext {
 	}
 
 	@Override
-	public <T, ACC> FoldingState<T, ACC> getFoldingState(final FoldingStateDescriptor<T, ACC> stateProperties) {
-		throw new UnsupportedOperationException("State is not supported.");
-	}
-
-	@Override
 	public <UK, UV> MapState<UK, UV> getMapState(final MapStateDescriptor<UK, UV> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
 	}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
index b77dd94..c45f8a0 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
@@ -23,11 +23,9 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -179,17 +177,6 @@ public class CepRuntimeContextTest extends TestLogger {
 		}
 
 		try {
-			runtimeContext.getFoldingState(new FoldingStateDescriptor<>(
-				"foobar",
-				0,
-				mock(FoldFunction.class),
-				Integer.class));
-			fail("Expected getFoldingState to fail with unsupported operation exception.");
-		} catch (UnsupportedOperationException e) {
-			// expected
-		}
-
-		try {
 			runtimeContext.getMapState(new MapStateDescriptor<>("foobar", Integer.class, String.class));
 			fail("Expected getMapState to fail with unsupported operation exception.");
 		} catch (UnsupportedOperationException e) {
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
index dc25468..fb16a77 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
@@ -31,8 +31,6 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -230,17 +228,6 @@ public final class SavepointRuntimeContext implements RuntimeContext {
 	}
 
 	@Override
-	@Deprecated
-	public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
-		if (!stateRegistrationAllowed) {
-			throw new RuntimeException(REGISTRATION_EXCEPTION_MSG);
-		}
-
-		registeredDescriptors.add(stateProperties);
-		return keyedStateStore.getFoldingState(stateProperties);
-	}
-
-	@Override
 	public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
 		if (!stateRegistrationAllowed) {
 			throw new RuntimeException(REGISTRATION_EXCEPTION_MSG);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 48c8eb8..2e32f16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -220,31 +220,6 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 	}
 
 	// -------------------------------------------------------------------------------------------
-	//  Deprecated state access methods
-	// -------------------------------------------------------------------------------------------
-
-	/**
-	 * @deprecated This was deprecated as part of a refinement to the function names.
-	 *             Please use {@link #getListState(ListStateDescriptor)} instead.
-	 */
-	@Deprecated
-	@Override
-	public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception {
-		return getListState(stateDescriptor);
-	}
-
-	/**
-	 * @deprecated Using Java serialization for persisting state is not encouraged.
-	 *             Please use {@link #getListState(ListStateDescriptor)} instead.
-	 */
-	@SuppressWarnings("unchecked")
-	@Deprecated
-	@Override
-	public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
-		return (ListState<T>) getListState(new ListStateDescriptor<>(stateName, deprecatedDefaultJavaSerializer));
-	}
-
-	// -------------------------------------------------------------------------------------------
 	//  Snapshot
 	// -------------------------------------------------------------------------------------------
 	@Nonnull
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 9ea2f98..981d6f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -787,7 +787,7 @@ public class OperatorStateBackendTest {
 		ListStateDescriptor<MutableType> stateDescriptor1 =
 				new ListStateDescriptor<>("test1", new JavaSerializer<MutableType>());
 
-		ListState<MutableType> listState1 = operatorStateBackend.getOperatorState(stateDescriptor1);
+		ListState<MutableType> listState1 = operatorStateBackend.getListState(stateDescriptor1);
 
 		listState1.add(MutableType.of(42));
 		listState1.add(MutableType.of(4711));
@@ -841,7 +841,7 @@ public class OperatorStateBackendTest {
 		ListStateDescriptor<MutableType> stateDescriptor1 =
 				new ListStateDescriptor<>("test1", new JavaSerializer<MutableType>());
 
-		ListState<MutableType> listState1 = operatorStateBackend.getOperatorState(stateDescriptor1);
+		ListState<MutableType> listState1 = operatorStateBackend.getListState(stateDescriptor1);
 
 		listState1.add(MutableType.of(42));
 		listState1.add(MutableType.of(4711));
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index 7b4ac6d..df379578 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -35,8 +35,6 @@ import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
@@ -186,11 +184,6 @@ public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction im
 		}
 
 		@Override
-		public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
-			throw new UnsupportedOperationException("State is not supported in rich async functions.");
-		}
-
-		@Override
 		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
 			throw new UnsupportedOperationException("State is not supported in rich async functions.");
 		}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index e451563..8f76a75 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -26,8 +26,6 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -214,13 +212,6 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 	}
 
 	@Override
-	public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
-		KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
-		stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
-		return keyedStateStore.getFoldingState(stateProperties);
-	}
-
-	@Override
 	public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
 		KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
 		stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index 2618e53..45c535d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -22,12 +22,10 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -197,17 +195,6 @@ public class RichAsyncFunctionTest {
 		}
 
 		try {
-			runtimeContext.getFoldingState(new FoldingStateDescriptor<>("foobar", 0, new FoldFunction<Integer, Integer>() {
-				@Override
-				public Integer fold(Integer accumulator, Integer value) throws Exception {
-					return accumulator;
-				}
-			}, Integer.class));
-		} catch (UnsupportedOperationException e) {
-			// expected
-		}
-
-		try {
 			runtimeContext.getMapState(new MapStateDescriptor<>("foobar", Integer.class, String.class));
 		} catch (UnsupportedOperationException e) {
 			// expected
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index fff4ef3..8ed41fc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -22,10 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
@@ -158,32 +156,6 @@ public class StreamingRuntimeContextTest {
 	}
 
 	@Test
-	public void testFoldingStateInstantiation() throws Exception {
-
-		final ExecutionConfig config = new ExecutionConfig();
-		config.registerKryoType(Path.class);
-
-		final AtomicReference<Object> descriptorCapture = new AtomicReference<>();
-
-		StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config);
-
-		@SuppressWarnings("unchecked")
-		FoldFunction<String, TaskInfo> folder = (FoldFunction<String, TaskInfo>) mock(FoldFunction.class);
-
-		FoldingStateDescriptor<String, TaskInfo> descr =
-				new FoldingStateDescriptor<>("name", null, folder, TaskInfo.class);
-
-		context.getFoldingState(descr);
-
-		FoldingStateDescriptor<?, ?> descrIntercepted = (FoldingStateDescriptor<?, ?>) descriptorCapture.get();
-		TypeSerializer<?> serializer = descrIntercepted.getSerializer();
-
-		// check that the Path class is really registered, i.e., the execution config was applied
-		assertTrue(serializer instanceof KryoSerializer);
-		assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0);
-	}
-
-	@Test
 	public void testListStateInstantiation() throws Exception {
 
 		final ExecutionConfig config = new ExecutionConfig();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
index 381eb80b..319f5c1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
 
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -73,16 +72,6 @@ public class MockOperatorStateStore implements OperatorStateStore {
 		throw new UnsupportedOperationException();
 	}
 
-	@Override
-	public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception {
-		return getListState(stateDescriptor);
-	}
-
-	@Override
-	public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
-		throw new UnsupportedOperationException();
-	}
-
 	public void checkpointBegin(long checkpointId) {
 		Map<String, TestUtils.MockListState> copiedStates = Collections.unmodifiableMap(copyStates(currentStateMap));
 		historyStateMap.put(checkpointId, copiedStates);
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
index 8c78b7a..d8ac9a7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -198,7 +198,7 @@ class StatefulJobSavepointMigrationITCase(
     }
 
     override def initializeState(context: FunctionInitializationContext): Unit = {
-      state = context.getOperatorStateStore.getOperatorState(
+      state = context.getOperatorStateStore.getListState(
         new ListStateDescriptor[CustomCaseClass](
           "sourceState", createTypeInformation[CustomCaseClass]))
     }
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
index 4acfa78..5ce76c4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
@@ -30,8 +30,8 @@ import org.apache.flink.api.scala.createTypeInformation
 import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
-import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader}
 import org.apache.flink.runtime.state.memory.MemoryStateBackend
+import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader}
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -46,7 +46,7 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{Assert, Ignore, Test}
 
-import scala.util.{Failure, Properties, Try}
+import scala.util.{Failure, Try}
 
 object StatefulJobWBroadcastStateMigrationITCase {
 
@@ -290,7 +290,7 @@ private class CheckpointedSource(val numElements: Int)
   }
 
   override def initializeState(context: FunctionInitializationContext): Unit = {
-    state = context.getOperatorStateStore.getOperatorState(
+    state = context.getOperatorStateStore.getListState(
       new ListStateDescriptor[CustomCaseClass](
         "sourceState", createTypeInformation[CustomCaseClass]))
   }


[flink] 01/04: [FLINK-17376] Use JavaSerializer instead of getSerializableListState()

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dc6181667ac012afc0c68cfef825ec7bb30a7589
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu May 28 15:10:21 2020 +0200

    [FLINK-17376] Use JavaSerializer instead of getSerializableListState()
    
    We do this because we want to deprecate that method. We will have to get
    rid of using JavaSerialization completely soon, though.
---
 .../connectors/fs/bucketing/BucketingSink.java       |  8 +++++++-
 .../fs/bucketing/BucketingSinkMigrationTest.java     |  8 +++++++-
 .../source/ContinuousFileReaderOperator.java         |  8 +++++++-
 .../source/MessageAcknowledgingSourceBase.java       |  8 +++++++-
 .../runtime/operators/GenericWriteAheadSink.java     | 11 +++++++++--
 .../util/functions/StreamingFunctionUtils.java       | 20 ++++++++++++++++----
 6 files changed, 53 insertions(+), 10 deletions(-)

diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 78cefaf..ad598a2 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.fs.bucketing;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -387,8 +389,12 @@ public class BucketingSink<T>
 			this.refTruncate = reflectTruncate(fs);
 		}
 
+		// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+		// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+		// directly on flink-runtime. We are doing it here because we need to maintain backwards
+		// compatibility with old state and because we will have to rework/remove this code soon.
 		OperatorStateStore stateStore = context.getOperatorStateStore();
-		restoredBucketStates = stateStore.getSerializableListState("bucket-states");
+		this.restoredBucketStates = stateStore.getListState(new ListStateDescriptor<>("bucket-states", new JavaSerializer<>()));
 
 		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
 		if (context.isRestored()) {
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
index 5bd0fcf..cfa68dc 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
@@ -19,10 +19,12 @@
 package org.apache.flink.streaming.connectors.fs.bucketing;
 
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -204,7 +206,11 @@ public class BucketingSinkMigrationTest {
 		public void initializeState(FunctionInitializationContext context) throws Exception {
 			OperatorStateStore stateStore = context.getOperatorStateStore();
 
-			ListState<State<T>> restoredBucketStates = stateStore.getSerializableListState("bucket-states");
+			// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+			// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+			// directly on flink-runtime. We are doing it here because we need to maintain backwards
+			// compatibility with old state and because we will have to rework/remove this code soon.
+			ListState<State<T>> restoredBucketStates = stateStore.getListState(new ListStateDescriptor<>("bucket-states", new JavaSerializer<>()));
 
 			if (context.isRestored()) {
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 87a028c..1b050f5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -23,10 +23,12 @@ import org.apache.flink.api.common.io.CheckpointableInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -243,7 +245,11 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
 
 		checkState(checkpointedState == null, "The reader state has already been initialized.");
 
-		checkpointedState = context.getOperatorStateStore().getSerializableListState("splits");
+		// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+		// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+		// directly on flink-runtime. We are doing it here because we need to maintain backwards
+		// compatibility with old state and because we will have to rework/remove this code soon.
+		checkpointedState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>()));
 
 		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
 		if (!context.isRestored()) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index 3a2a5ca..5b99194 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.source;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -28,6 +29,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.util.Preconditions;
 
@@ -138,9 +140,13 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 		Preconditions.checkState(this.checkpointedState == null,
 			"The " + getClass().getSimpleName() + " has already been initialized.");
 
+		// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+		// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+		// directly on flink-runtime. We are doing it here because we need to maintain backwards
+		// compatibility with old state and because we will have to rework/remove this code soon.
 		this.checkpointedState = context
 			.getOperatorStateStore()
-			.getSerializableListState("message-acknowledging-source-state");
+			.getListState(new ListStateDescriptor<>("message-acknowledging-source-state", new JavaSerializer<>()));
 
 		this.idsForCurrentCheckpoint = new HashSet<>(64);
 		this.pendingCheckpoints = new ArrayDeque<>();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 4ad0fc6..889cec1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -26,6 +27,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -92,8 +94,13 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 		Preconditions.checkState(this.checkpointedState == null,
 			"The reader state has already been initialized.");
 
-		checkpointedState = context.getOperatorStateStore()
-			.getSerializableListState("pending-checkpoints");
+		// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+		// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+		// directly on flink-runtime. We are doing it here because we need to maintain backwards
+		// compatibility with old state and because we will have to rework/remove this code soon.
+		checkpointedState = context
+							.getOperatorStateStore()
+							.getListState(new ListStateDescriptor<>("pending-checkpoints", new JavaSerializer<>()));
 
 		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
 		if (context.isRestored()) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
index 4482431..d9ea561 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
@@ -22,9 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -125,8 +127,13 @@ public final class StreamingFunctionUtils {
 			List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
 					snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
 
-			ListState<Serializable> listState = backend.
-					getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
+			// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+			// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+			// directly on flink-runtime. We are doing it here because we need to maintain backwards
+			// compatibility with old state and because we will have to rework/remove this code soon.
+			ListStateDescriptor<Serializable> listStateDescriptor =
+				new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, new JavaSerializer<>());
+			ListState<Serializable> listState = backend.getListState(listStateDescriptor);
 
 			listState.clear();
 
@@ -184,8 +191,13 @@ public final class StreamingFunctionUtils {
 			@SuppressWarnings("unchecked")
 			ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;
 
-			ListState<Serializable> listState = context.getOperatorStateStore().
-					getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
+			// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+			// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+			// directly on flink-runtime. We are doing it here because we need to maintain backwards
+			// compatibility with old state and because we will have to rework/remove this code soon.
+			ListStateDescriptor<Serializable> listStateDescriptor =
+				new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, new JavaSerializer<>());
+			ListState<Serializable> listState = context.getOperatorStateStore().getListState(listStateDescriptor);
 
 			List<Serializable> list = new ArrayList<>();
 


[flink] 04/04: [FLINK-18032] Remove outdated sections in migration guide

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a3250c6a82a71f7bd9617cbf085bb40393576292
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Fri May 29 10:53:14 2020 +0200

    [FLINK-18032] Remove outdated sections in migration guide
---
 docs/dev/migration.md    | 456 +----------------------------------------------
 docs/dev/migration.zh.md | 456 +----------------------------------------------
 2 files changed, 8 insertions(+), 904 deletions(-)

diff --git a/docs/dev/migration.md b/docs/dev/migration.md
index 288e2ef..09556c1 100644
--- a/docs/dev/migration.md
+++ b/docs/dev/migration.md
@@ -25,6 +25,10 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
+See the [older migration
+guide](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/migration.html)
+for information about migrating from older versions than Flink 1.3.
+
 ## Migrating from Flink 1.3+ to Flink 1.7
 
 ### API changes for serializer snapshots
@@ -35,456 +39,4 @@ The old `TypeSerializerConfigSnapshot` abstraction is now deprecated, and will b
 in favor of the new `TypeSerializerSnapshot`. For details and guides on how to migrate, please see
 [Migrating from deprecated serializer snapshot APIs before Flink 1.7]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#migrating-from-deprecated-serializer-snapshot-apis-before-flink-17).
 
-## Migrating from Flink 1.2 to Flink 1.3
-
-There are a few APIs that have been changed since Flink 1.2. Most of the changes are documented in their
-specific documentations. The following is a consolidated list of API changes and links to details for migration when
-upgrading to Flink 1.3.
-
-### `TypeSerializer` interface changes
-
-This would be relevant mostly for users implementing custom `TypeSerializer`s for their state.
-
-Since Flink 1.3, two additional methods have been added that are related to serializer compatibility
-across savepoint restores. Please see
-[Handling serializer upgrades and compatibility]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility)
-for further details on how to implement these methods.
-
-### `ProcessFunction` is always a `RichFunction`
-
-In Flink 1.2, `ProcessFunction` and its rich variant `RichProcessFunction` was introduced.
-Since Flink 1.3, `RichProcessFunction` was removed and `ProcessFunction` is now always a `RichFunction` with access to
-the lifecycle methods and runtime context.
-
-### Flink CEP library API changes
-
-The CEP library in Flink 1.3 ships with a number of new features which have led to some changes in the API.
-Please visit the [CEP Migration docs]({{ site.baseurl }}/dev/libs/cep.html#migrating-from-an-older-flink-version) for details.
-
-### Logger dependencies removed from Flink core artifacts
-
-In Flink 1.3, to make sure that users can use their own custom logging framework, core Flink artifacts are
-now clean of specific logger dependencies.
-
-Example and quickstart archetypes already have loggers specified and should not be affected.
-For other custom projects, make sure to add logger dependencies. For example, in Maven's `pom.xml`, you can add:
-
-{% highlight xml %}
-<dependency>
-    <groupId>org.slf4j</groupId>
-    <artifactId>slf4j-log4j12</artifactId>
-    <version>1.7.7</version>
-</dependency>
-
-<dependency>
-    <groupId>log4j</groupId>
-    <artifactId>log4j</artifactId>
-    <version>1.2.17</version>
-</dependency>
-{% endhighlight %}
-
-## Migrating from Flink 1.1 to Flink 1.2
-
-As mentioned in the [State documentation]({{ site.baseurl }}/dev/stream/state/state.html), Flink has two types of state:
-**keyed** and **non-keyed** state (also called **operator** state). Both types are available to
-both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1
-function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the
-deprecation of the aligned window operators from Flink 1.1 (see [Aligned Processing Time Window Operators](#aligned-processing-time-window-operators)).
-
-The migration process will serve two goals:
-
-1. allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling,
-
-2. make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by its
-Flink 1.1 predecessor.
-
-After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2
-simply by taking a [savepoint]({{ site.baseurl }}/ops/state/savepoints.html) with your Flink 1.1 job and giving it to
-your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its
-Flink 1.1 predecessor left off.
-
-### Example User Functions
-
-As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
-functions. The first is an example of a function with **keyed** state, while
-the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below:
-
-{% highlight java %}
-public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
-
-    private transient ValueState<Integer> counter;
-
-    private final int numberElements;
-
-    public CountMapper(int numberElements) {
-        this.numberElements = numberElements;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        counter = getRuntimeContext().getState(
-            new ValueStateDescriptor<>("counter", Integer.class, 0));
-    }
-
-    @Override
-    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
-        int count = counter.value() + 1;
-        counter.update(count);
-
-        if (count % numberElements == 0) {
-            out.collect(Tuple2.of(value.f0, count));
-            counter.update(0); // reset to 0
-        }
-    }
-}
-
-public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-    Checkpointed<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private ArrayList<Tuple2<String, Integer>> bufferedElements;
-
-    BufferingSink(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public ArrayList<Tuple2<String, Integer>> snapshotState(
-        long checkpointId, long checkpointTimestamp) throws Exception {
-        return bufferedElements;
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-        bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-
-The `CountMapper` is a `RichFlatMapFunction` which assumes a grouped-by-key input stream of the form
-`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if
-the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted
-containing the word itself and the number of occurrences.
-
-The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
-and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink.
-This is a common way to avoid many expensive calls to a database or an external storage system. To do the
-buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is
-periodically checkpointed.
-
-### State API Migration
-
-To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions.
-After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you
-are guaranteed that the new version of your job will start from where its predecessor left off.
-
-**Keyed State:** Something to note before delving into the details of the migration process is that if your function
-has **only keyed state**, then the exact same code from Flink 1.1 also works for Flink 1.2 with full support
-for the new features and full backwards compatibility. Changes could be made just for better code organization,
-but this is just a matter of style.
-
-With the above said, the rest of this section focuses on the **non-keyed state**.
-
-#### Rescaling and new state abstractions
-
-The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface
-to the new ones. In Flink 1.2, a stateful function can implement either the more general `CheckpointedFunction`
-interface, or the `ListCheckpointed<T extends Serializable>` interface, which is semantically closer to the old
-`Checkpointed` one.
-
-In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other,
-thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
-non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
-contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
-while `(test2, 2)` will go to task 1.
-
-More details on the principles behind rescaling of both keyed state and non-keyed state can be found in
-the [State documentation]({{ site.baseurl }}/dev/stream/state/index.html).
-
-##### ListCheckpointed
-
-The `ListCheckpointed` interface requires the implementation of two methods:
-
-{% highlight java %}
-List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
-
-void restoreState(List<T> state) throws Exception;
-{% endhighlight %}
-
-Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference
-is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and
-`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always
-return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink`
-is included below:
-
-{% highlight java %}
-public class BufferingSinkListCheckpointed implements
-        SinkFunction<Tuple2<String, Integer>>,
-        ListCheckpointed<Tuple2<String, Integer>>,
-        CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
-    private List<Tuple2<String, Integer>> bufferedElements;
-
-    public BufferingSinkListCheckpointed(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        this.bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public List<Tuple2<String, Integer>> snapshotState(
-            long checkpointId, long timestamp) throws Exception {
-        return this.bufferedElements;
-    }
-
-    @Override
-    public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
-        if (!state.isEmpty()) {
-            this.bufferedElements.addAll(state);
-        }
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-        // this is from the CheckpointedRestoring interface.
-        this.bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards
-compatibility reasons and more details will be explained at the end of this section.
-
-##### CheckpointedFunction
-
-The `CheckpointedFunction` interface requires again the implementation of two methods:
-
-{% highlight java %}
-void snapshotState(FunctionSnapshotContext context) throws Exception;
-
-void initializeState(FunctionInitializationContext context) throws Exception;
-{% endhighlight %}
-
-As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is
-the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only
-in the case that we are recovering from a failure. Given this, `initializeState()` is not only the place where different
-types of state are initialized, but also where state recovery logic is included. An implementation of the
-`CheckpointedFunction` interface for `BufferingSink` is presented below.
-
-{% highlight java %}
-public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-        CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
-    private List<Tuple2<String, Integer>> bufferedElements;
-
-    public BufferingSink(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws Exception {
-        checkpointedState.clear();
-        for (Tuple2<String, Integer> element : bufferedElements) {
-            checkpointedState.add(element);
-        }
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws Exception {
-        checkpointedState = context.getOperatorStateStore().
-            getSerializableListState("buffered-elements");
-
-        if (context.isRestored()) {
-            for (Tuple2<String, Integer> element : checkpointedState.get()) {
-                bufferedElements.add(element);
-            }
-        }
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-        // this is from the CheckpointedRestoring interface.
-        this.bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize
-the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects
-are going to be stored upon checkpointing:
-
-`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");`
-
-After initializing the container, we use the `isRestored()` method of the context to check if we are
-recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied.
-
-As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state
-initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared
-of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.
-
-As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done
-using the `FunctionInitializationContext` given as argument, instead of the `RuntimeContext`, which is the case
-for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `CountMapper` example,
-the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods
-would look like this:
-
-{% highlight java %}
-public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
-        implements CheckpointedFunction {
-
-    private transient ValueState<Integer> counter;
-
-    private final int numberElements;
-
-    public CountMapper(int numberElements) {
-        this.numberElements = numberElements;
-    }
-
-    @Override
-    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
-        int count = counter.value() + 1;
-        counter.update(count);
-
-        if (count % numberElements == 0) {
-            out.collect(Tuple2.of(value.f0, count));
-            counter.update(0); // reset to 0
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws Exception {
-        // all managed, nothing to do.
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws Exception {
-        counter = context.getKeyedStateStore().getState(
-            new ValueStateDescriptor<>("counter", Integer.class, 0));
-    }
-}
-{% endhighlight %}
-
-Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state
-upon checkpointing.
-
-#### Backwards compatibility with Flink 1.1
-
-So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2.
-The question that remains is "Can I make sure that my modified (Flink 1.2) job will start from where my already
-running job from Flink 1.1 stopped?".
-
-The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing.
-Flink will take care of restoring the state from Flink 1.1. For the non-keyed state, your new function has to
-implement the `CheckpointedRestoring` interface, as shown in the code above. This has a single method, the
-familiar `restoreState()` from the old `Checkpointed` interface from Flink 1.1. As shown in the modified code of
-the `BufferingSink`, the `restoreState()` method is identical to its predecessor.
-
-### Aligned Processing Time Window Operators
-
-In Flink 1.1, and only when operating on *processing time* with no specified evictor or trigger,
-the command `timeWindow()` on a keyed stream would instantiate a special type of `WindowOperator`. This could be
-either an `AggregatingProcessingTimeWindowOperator` or an `AccumulatingProcessingTimeWindowOperator`. Both of
-these operators are referred to as *aligned* window operators as they assume their input elements arrive in
-order. This is valid when operating in processing time, as elements get as timestamp the wall-clock time at
-the moment they arrive at the window operator. These operators were restricted to using the memory state backend, and
-had optimized data structures for storing the per-window elements which leveraged the in-order input element arrival.
-
-In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the generic
-`WindowOperator`. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparently
-read the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a format
-that is compatible with the generic `WindowOperator`, and resume execution using the generic `WindowOperator`.
-
-<span class="label label-info">Note</span> Although deprecated, you can still use the aligned window operators
-in Flink 1.2 through special `WindowAssigners` introduced for exactly this purpose. These assigners are the
-`SlidingAlignedProcessingTimeWindows` and the `TumblingAlignedProcessingTimeWindows` assigners, for sliding and tumbling
-windows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way to
-resume execution from a Flink 1.1 savepoint while using these operators.
-
-<span class="label label-danger">Attention</span> The aligned window operators provide **no rescaling** capabilities
-and **no backwards compatibility** with Flink 1.1.
-
-The code to use the aligned window operators in Flink 1.2 is presented below:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-// for tumbling windows
-DataStream<Tuple2<String, Integer>> window1 = source
-    .keyBy(0)
-    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
-    .apply(your-function)
-
-// for sliding windows
-DataStream<Tuple2<String, Integer>> window1 = source
-    .keyBy(0)
-    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
-    .apply(your-function)
-
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-// for tumbling windows
-val window1 = source
-    .keyBy(0)
-    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
-    .apply(your-function)
-
-// for sliding windows
-val window2 = source
-    .keyBy(0)
-    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
-    .apply(your-function)
-
-{% endhighlight %}
-</div>
-</div>
-
 {% top %}
diff --git a/docs/dev/migration.zh.md b/docs/dev/migration.zh.md
index fb1c559..76b4e8c 100644
--- a/docs/dev/migration.zh.md
+++ b/docs/dev/migration.zh.md
@@ -25,6 +25,10 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
+See the [older migration
+guide](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/migration.html)
+for information about migrating from older versions than Flink 1.3.
+
 ## Migrating from Flink 1.3+ to Flink 1.7
 
 ### API changes for serializer snapshots
@@ -35,456 +39,4 @@ The old `TypeSerializerConfigSnapshot` abstraction is now deprecated, and will b
 in favor of the new `TypeSerializerSnapshot`. For details and guides on how to migrate, please see
 [Migrating from deprecated serializer snapshot APIs before Flink 1.7]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#migrating-from-deprecated-serializer-snapshot-apis-before-flink-17).
 
-## Migrating from Flink 1.2 to Flink 1.3
-
-There are a few APIs that have been changed since Flink 1.2. Most of the changes are documented in their
-specific documentations. The following is a consolidated list of API changes and links to details for migration when
-upgrading to Flink 1.3.
-
-### `TypeSerializer` interface changes
-
-This would be relevant mostly for users implementing custom `TypeSerializer`s for their state.
-
-Since Flink 1.3, two additional methods have been added that are related to serializer compatibility
-across savepoint restores. Please see
-[Handling serializer upgrades and compatibility]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility)
-for further details on how to implement these methods.
-
-### `ProcessFunction` is always a `RichFunction`
-
-In Flink 1.2, `ProcessFunction` and its rich variant `RichProcessFunction` was introduced.
-Since Flink 1.3, `RichProcessFunction` was removed and `ProcessFunction` is now always a `RichFunction` with access to
-the lifecycle methods and runtime context.
-
-### Flink CEP library API changes
-
-The CEP library in Flink 1.3 ships with a number of new features which have led to some changes in the API.
-Please visit the [CEP Migration docs]({{ site.baseurl }}/dev/libs/cep.html#migrating-from-an-older-flink-version) for details.
-
-### Logger dependencies removed from Flink core artifacts
-
-In Flink 1.3, to make sure that users can use their own custom logging framework, core Flink artifacts are
-now clean of specific logger dependencies.
-
-Example and quickstart archetypes already have loggers specified and should not be affected.
-For other custom projects, make sure to add logger dependencies. For example, in Maven's `pom.xml`, you can add:
-
-{% highlight xml %}
-<dependency>
-    <groupId>org.slf4j</groupId>
-    <artifactId>slf4j-log4j12</artifactId>
-    <version>1.7.7</version>
-</dependency>
-
-<dependency>
-    <groupId>log4j</groupId>
-    <artifactId>log4j</artifactId>
-    <version>1.2.17</version>
-</dependency>
-{% endhighlight %}
-
-## Migrating from Flink 1.1 to Flink 1.2
-
-As mentioned in the [State documentation]({{ site.baseurl }}/dev/stream/state/state.html), Flink has two types of state:
-**keyed** and **non-keyed** state (also called **operator** state). Both types are available to
-both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1
-function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the
-deprecation of the aligned window operators from Flink 1.1 (see [Aligned Processing Time Window Operators](#aligned-processing-time-window-operators)).
-
-The migration process will serve two goals:
-
-1. allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling,
-
-2. make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by its
-Flink 1.1 predecessor.
-
-After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2
-simply by taking a [savepoint]({{ site.baseurl }}/ops/state/savepoints.html) with your Flink 1.1 job and giving it to
-your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its
-Flink 1.1 predecessor left off.
-
-### Example User Functions
-
-As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
-functions. The first is an example of a function with **keyed** state, while
-the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below:
-
-{% highlight java %}
-public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
-
-    private transient ValueState<Integer> counter;
-
-    private final int numberElements;
-
-    public CountMapper(int numberElements) {
-        this.numberElements = numberElements;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        counter = getRuntimeContext().getState(
-            new ValueStateDescriptor<>("counter", Integer.class, 0));
-    }
-
-    @Override
-    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
-        int count = counter.value() + 1;
-        counter.update(count);
-
-        if (count % numberElements == 0) {
-            out.collect(Tuple2.of(value.f0, count));
-            counter.update(0); // reset to 0
-        }
-    }
-}
-
-public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-    Checkpointed<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private ArrayList<Tuple2<String, Integer>> bufferedElements;
-
-    BufferingSink(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public ArrayList<Tuple2<String, Integer>> snapshotState(
-        long checkpointId, long checkpointTimestamp) throws Exception {
-        return bufferedElements;
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-        bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-
-The `CountMapper` is a `RichFlatMapFunction` which assumes a grouped-by-key input stream of the form
-`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if
-the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted
-containing the word itself and the number of occurrences.
-
-The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
-and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink.
-This is a common way to avoid many expensive calls to a database or an external storage system. To do the
-buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is
-periodically checkpointed.
-
-### State API Migration
-
-To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions.
-After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you
-are guaranteed that the new version of your job will start from where its predecessor left off.
-
-**Keyed State:** Something to note before delving into the details of the migration process is that if your function
-has **only keyed state**, then the exact same code from Flink 1.1 also works for Flink 1.2 with full support
-for the new features and full backwards compatibility. Changes could be made just for better code organization,
-but this is just a matter of style.
-
-With the above said, the rest of this section focuses on the **non-keyed state**.
-
-#### Rescaling and new state abstractions
-
-The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface
-to the new ones. In Flink 1.2, a stateful function can implement either the more general `CheckpointedFunction`
-interface, or the `ListCheckpointed<T extends Serializable>` interface, which is semantically closer to the old
-`Checkpointed` one.
-
-In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other,
-thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
-non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
-contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
-while `(test2, 2)` will go to task 1.
-
-More details on the principles behind rescaling of both keyed state and non-keyed state can be found in
-the [State documentation]({{ site.baseurl }}/dev/stream/state/index.html).
-
-##### ListCheckpointed
-
-The `ListCheckpointed` interface requires the implementation of two methods:
-
-{% highlight java %}
-List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
-
-void restoreState(List<T> state) throws Exception;
-{% endhighlight %}
-
-Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference
-is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and
-`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always
-return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink`
-is included below:
-
-{% highlight java %}
-public class BufferingSinkListCheckpointed implements
-        SinkFunction<Tuple2<String, Integer>>,
-        ListCheckpointed<Tuple2<String, Integer>>,
-        CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
-    private List<Tuple2<String, Integer>> bufferedElements;
-
-    public BufferingSinkListCheckpointed(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        this.bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public List<Tuple2<String, Integer>> snapshotState(
-            long checkpointId, long timestamp) throws Exception {
-        return this.bufferedElements;
-    }
-
-    @Override
-    public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
-        if (!state.isEmpty()) {
-            this.bufferedElements.addAll(state);
-        }
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-        // this is from the CheckpointedRestoring interface.
-        this.bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards
-compatibility reasons and more details will be explained at the end of this section.
-
-##### CheckpointedFunction
-
-The `CheckpointedFunction` interface requires again the implementation of two methods:
-
-{% highlight java %}
-void snapshotState(FunctionSnapshotContext context) throws Exception;
-
-void initializeState(FunctionInitializationContext context) throws Exception;
-{% endhighlight %}
-
-As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is
-the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only
-in the case that we are recovering from a failure. Given this, `initializeState()` is not only the place where different
-types of state are initialized, but also where state recovery logic is included. An implementation of the
-`CheckpointedFunction` interface for `BufferingSink` is presented below.
-
-{% highlight java %}
-public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-        CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
-    private List<Tuple2<String, Integer>> bufferedElements;
-
-    public BufferingSink(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws Exception {
-        checkpointedState.clear();
-        for (Tuple2<String, Integer> element : bufferedElements) {
-            checkpointedState.add(element);
-        }
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws Exception {
-        checkpointedState = context.getOperatorStateStore().
-            getSerializableListState("buffered-elements");
-
-        if (context.isRestored()) {
-            for (Tuple2<String, Integer> element : checkpointedState.get()) {
-                bufferedElements.add(element);
-            }
-        }
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-        // this is from the CheckpointedRestoring interface.
-        this.bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize
-the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects
-are going to be stored upon checkpointing:
-
-`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");`
-
-After initializing the container, we use the `isRestored()` method of the context to check if we are
-recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied.
-
-As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state
-initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared
-of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.
-
-As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done
-using the `FunctionInitializationContext` given as argument, instead of the `RuntimeContext`, which is the case
-for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `CountMapper` example,
-the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods
-would look like this:
-
-{% highlight java %}
-public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
-        implements CheckpointedFunction {
-
-    private transient ValueState<Integer> counter;
-
-    private final int numberElements;
-
-    public CountMapper(int numberElements) {
-        this.numberElements = numberElements;
-    }
-
-    @Override
-    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
-        int count = counter.value() + 1;
-        counter.update(count);
-
-        if (count % numberElements == 0) {
-            out.collect(Tuple2.of(value.f0, count));
-            counter.update(0); // reset to 0
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws Exception {
-        // all managed, nothing to do.
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws Exception {
-        counter = context.getKeyedStateStore().getState(
-            new ValueStateDescriptor<>("counter", Integer.class, 0));
-    }
-}
-{% endhighlight %}
-
-Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state
-upon checkpointing.
-
-#### Backwards compatibility with Flink 1.1
-
-So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2.
-The question that remains is "Can I make sure that my modified (Flink 1.2) job will start from where my already
-running job from Flink 1.1 stopped?".
-
-The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing.
-Flink will take care of restoring the state from Flink 1.1. For the non-keyed state, your new function has to
-implement the `CheckpointedRestoring` interface, as shown in the code above. This has a single method, the
-familiar `restoreState()` from the old `Checkpointed` interface from Flink 1.1. As shown in the modified code of
-the `BufferingSink`, the `restoreState()` method is identical to its predecessor.
-
-### Aligned Processing Time Window Operators
-
-In Flink 1.1, and only when operating on *processing time* with no specified evictor or trigger,
-the command `timeWindow()` on a keyed stream would instantiate a special type of `WindowOperator`. This could be
-either an `AggregatingProcessingTimeWindowOperator` or an `AccumulatingProcessingTimeWindowOperator`. Both of
-these operators are referred to as *aligned* window operators as they assume their input elements arrive in
-order. This is valid when operating in processing time, as elements get as timestamp the wall-clock time at
-the moment they arrive at the window operator. These operators were restricted to using the memory state backend, and
-had optimized data structures for storing the per-window elements which leveraged the in-order input element arrival.
-
-In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the generic
-`WindowOperator`. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparently
-read the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a format
-that is compatible with the generic `WindowOperator`, and resume execution using the generic `WindowOperator`.
-
-<span class="label label-info">Note</span> Although deprecated, you can still use the aligned window operators
-in Flink 1.2 through special `WindowAssigners` introduced for exactly this purpose. These assigners are the
-`SlidingAlignedProcessingTimeWindows` and the `TumblingAlignedProcessingTimeWindows` assigners, for sliding and tumbling
-windows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way to
-resume execution from a Flink 1.1 savepoint while using these operators.
-
-<span class="label label-danger">Attention</span> The aligned window operators provide **no rescaling** capabilities
-and **no backwards compatibility** with Flink 1.1.
-
-The code to use the aligned window operators in Flink 1.2 is presented below:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-// for tumbling windows
-DataStream<Tuple2<String, Integer>> window1 = source
-    .keyBy(0)
-    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
-    .apply(your-function)
-
-// for sliding windows
-DataStream<Tuple2<String, Integer>> window1 = source
-    .keyBy(0)
-    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
-    .apply(your-function)
-
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-// for tumbling windows
-val window1 = source
-    .keyBy(0)
-    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
-    .apply(your-function)
-
-// for sliding windows
-val window2 = source
-    .keyBy(0)
-    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
-    .apply(your-function)
-
-{% endhighlight %}
-</div>
-</div>
-
 {% top %}