You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/05/04 20:30:24 UTC

[flink] branch release-1.13 updated: [FLINK-22563][docs] Add migration guide for new StateBackend interfaces

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new ff10055  [FLINK-22563][docs] Add migration guide for new StateBackend interfaces
ff10055 is described below

commit ff1005524641dc5cdcb8188aa9528f6c1a4b92a8
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue May 4 12:58:56 2021 -0500

    [FLINK-22563][docs] Add migration guide for new StateBackend interfaces
    
    Co-authored-by: Nico Kruber <ni...@gmail.com>
    
    This closes #15831
---
 docs/content.zh/docs/ops/state/state_backends.md | 129 +++++++++++++++++++++++
 docs/content/docs/ops/state/state_backends.md    | 129 +++++++++++++++++++++++
 2 files changed, 258 insertions(+)

diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md
index d1343f6..57ae844 100644
--- a/docs/content.zh/docs/ops/state/state_backends.md
+++ b/docs/content.zh/docs/ops/state/state_backends.md
@@ -331,3 +331,132 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
 ```
 
 {{< top >}}
+
+## Migrating from Legacy Backends
+
+Beginning in **Flink 1.13**, the community reworked its public state backend classes to help users better understand the separation of local state storage and checkpoint storage.
+This change does not affect the runtime implementation or characteristics of Flink's state backend or checkpointing process; it is simply to communicate intent better.
+Users can migrate existing applications to use the new API without losing any state or consistency. 
+
+### MemoryStateBackend
+
+The legacy `MemoryStateBackend` is equivalent to using [`HashMapStateBackend`](#the-hashmapstatebackend) and [`JobManagerCheckpointStorage`]({{< ref "docs/ops/state/checkpoints#the-jobmanagercheckpointstorage" >}}).
+
+#### `flink-conf.yaml` configuration 
+
+```yaml
+state.backend: hashmap
+
+# Optional, Flink will automatically default to JobManagerCheckpointStorage
+# when no checkpoint directory is specified.
+state.checkpoint-storage: jobmanager
+```
+
+#### Code Configuration
+
+{{< tabs "memorystatebackendmigration" >}}
+{{< tab "Java" >}}
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStateBackend(new HashMapStateBackend());
+env.getCheckpointConfig().setCheckpointStorage(new JobManagerStateBackend());
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(new HashMapStateBackend)
+env.getCheckpointConfig().setCheckpointStorage(new JobManagerStateBackend)
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+### FsStateBackend 
+
+The legacy `FsStateBackend` is equivalent to using [`HashMapStateBackend`](#the-hashmapstatebackend) and [`FileSystemCheckpointStorage`]({{< ref "docs/ops/state/checkpoints#the-filesystemcheckpointstorage" >}}).
+
+#### `flink-conf.yaml` configuration
+
+```yaml
+state.backend: hashmap
+state.checkpoints.dir: file:///checkpoint-dir/
+
+# Optional, Flink will automatically default to FileSystemCheckpointStorage
+# when a checkpoint directory is specified.
+state.checkpoint-storage: filesystem
+```
+
+#### Code Configuration
+
+{{< tabs "memorystatebackendmigration" >}}
+{{< tab "Java" >}}
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStateBackend(new HashMapStateBackend());
+env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
+
+
+// Advanced FsStateBackend configurations, such as write buffer size
+// can be set by manually instantiating a FileSystemCheckpointStorage object.
+env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(new HashMapStateBackend)
+env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir")
+
+
+// Advanced FsStateBackend configurations, such as write buffer size
+// can be set by using manually instantiating a FileSystemCheckpointStorage object.
+env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"))
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+### RocksDBStateBackend 
+
+The legacy `RocksDBStateBackend` is equivalent to using [`EmbeddedRocksDBStateBackend`](#the-embeddedrocksdbstatebackend) and [`FileSystemCheckpointStorage`]({{< ref "docs/ops/state/checkpoints#the-filesystemcheckpointstorage" >}}).
+
+#### `flink-conf.yaml` configuration 
+
+```yaml
+state.backend: rocksdb
+state.checkpoints.dir: file:///checkpoint-dir/
+
+# Optional, Flink will automatically default to FileSystemCheckpointStorage
+# when a checkpoint directory is specified.
+state.checkpoint-storage: filesystem
+```
+
+#### Code Configuration
+
+{{< tabs "memorystatebackendmigration" >}}
+{{< tab "Java" >}}
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStateBackend(new EmbeddedRocksDBStateBackend());
+env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
+
+
+// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
+// to specify advanced checkpointing configurations such as write buffer size,
+// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
+env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```java
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(new EmbeddedRocksDBStateBackend)
+env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir")
+
+
+// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
+// to specify advanced checkpointing configurations such as write buffer size,
+// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
+env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"))
+```
+{{< /tab >}}
+{{< /tabs>}}
diff --git a/docs/content/docs/ops/state/state_backends.md b/docs/content/docs/ops/state/state_backends.md
index d121e7e..cdcf603 100644
--- a/docs/content/docs/ops/state/state_backends.md
+++ b/docs/content/docs/ops/state/state_backends.md
@@ -322,3 +322,132 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
 ```
 
 {{< top >}}
+
+## Migrating from Legacy Backends
+
+Beginning in **Flink 1.13**, the community reworked its public state backend classes to help users better understand the separation of local state storage and checkpoint storage.
+This change does not affect the runtime implementation or characteristics of Flink's state backend or checkpointing process; it is simply to communicate intent better.
+Users can migrate existing applications to use the new API without losing any state or consistency. 
+
+### MemoryStateBackend
+
+The legacy `MemoryStateBackend` is equivalent to using [`HashMapStateBackend`](#the-hashmapstatebackend) and [`JobManagerCheckpointStorage`]({{< ref "docs/ops/state/checkpoints#the-jobmanagercheckpointstorage" >}}).
+
+#### `flink-conf.yaml` configuration 
+
+```yaml
+state.backend: hashmap
+
+# Optional, Flink will automatically default to JobManagerCheckpointStorage
+# when no checkpoint directory is specified.
+state.checkpoint-storage: jobmanager
+```
+
+#### Code Configuration
+
+{{< tabs "memorystatebackendmigration" >}}
+{{< tab "Java" >}}
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStateBackend(new HashMapStateBackend());
+env.getCheckpointConfig().setCheckpointStorage(new JobManagerStateBackend());
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(new HashMapStateBackend)
+env.getCheckpointConfig().setCheckpointStorage(new JobManagerStateBackend)
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+### FsStateBackend 
+
+The legacy `FsStateBackend` is equivalent to using [`HashMapStateBackend`](#the-hashmapstatebackend) and [`FileSystemCheckpointStorage`]({{< ref "docs/ops/state/checkpoints#the-filesystemcheckpointstorage" >}}).
+
+#### `flink-conf.yaml` configuration
+
+```yaml
+state.backend: hashmap
+state.checkpoints.dir: file:///checkpoint-dir/
+
+# Optional, Flink will automatically default to FileSystemCheckpointStorage
+# when a checkpoint directory is specified.
+state.checkpoint-storage: filesystem
+```
+
+#### Code Configuration
+
+{{< tabs "memorystatebackendmigration" >}}
+{{< tab "Java" >}}
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStateBackend(new HashMapStateBackend());
+env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
+
+
+// Advanced FsStateBackend configurations, such as write buffer size
+// can be set by manually instantiating a FileSystemCheckpointStorage object.
+env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(new HashMapStateBackend)
+env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir")
+
+
+// Advanced FsStateBackend configurations, such as write buffer size
+// can be set by using manually instantiating a FileSystemCheckpointStorage object.
+env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"))
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+### RocksDBStateBackend 
+
+The legacy `RocksDBStateBackend` is equivalent to using [`EmbeddedRocksDBStateBackend`](#the-embeddedrocksdbstatebackend) and [`FileSystemCheckpointStorage`]({{< ref "docs/ops/state/checkpoints#the-filesystemcheckpointstorage" >}}).
+
+#### `flink-conf.yaml` configuration 
+
+```yaml
+state.backend: rocksdb
+state.checkpoints.dir: file:///checkpoint-dir/
+
+# Optional, Flink will automatically default to FileSystemCheckpointStorage
+# when a checkpoint directory is specified.
+state.checkpoint-storage: filesystem
+```
+
+#### Code Configuration
+
+{{< tabs "memorystatebackendmigration" >}}
+{{< tab "Java" >}}
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStateBackend(new EmbeddedRocksDBStateBackend());
+env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
+
+
+// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
+// to specify advanced checkpointing configurations such as write buffer size,
+// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
+env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```java
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(new EmbeddedRocksDBStateBackend)
+env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir")
+
+
+// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
+// to specify advanced checkpointing configurations such as write buffer size,
+// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
+env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"))
+```
+{{< /tab >}}
+{{< /tabs>}}
\ No newline at end of file