You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/07/18 06:23:26 UTC
[flink] branch master updated: [FLINK-28140][python][docs] Improve the documentation by adding Python examples in several pages
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cf6e72e504a [FLINK-28140][python][docs] Improve the documentation by adding Python examples in several pages
cf6e72e504a is described below
commit cf6e72e504a7a5849dd397f6f0db2d2ce2cf1710
Author: Mingde Peng <pe...@gmail.com>
AuthorDate: Mon Jul 18 09:57:41 2022 +0800
[FLINK-28140][python][docs] Improve the documentation by adding Python examples in several pages
This closes #20290.
---
.../datastream/event-time/generating_watermarks.md | 21 +++++++++
.../execution/execution_configuration.md | 6 +++
.../docs/dev/datastream/execution/parallel.md | 41 ++++++++++++++++
docs/content.zh/docs/dev/table/tableApi.md | 54 +++++++++++++---------
docs/content.zh/docs/dev/table/timezone.md | 15 ++++++
docs/content.zh/docs/ops/state/state_backends.md | 48 ++++++++++++++++++-
.../docs/ops/state/task_failure_recovery.md | 34 ++++++++++++++
.../datastream/event-time/generating_watermarks.md | 21 +++++++++
.../execution/execution_configuration.md | 6 +++
.../docs/dev/datastream/execution/parallel.md | 41 ++++++++++++++++
docs/content/docs/dev/table/tableApi.md | 54 +++++++++++++---------
docs/content/docs/dev/table/timezone.md | 15 ++++++
docs/content/docs/ops/state/state_backends.md | 47 +++++++++++++++++++
.../docs/ops/state/task_failure_recovery.md | 39 ++++++++++++++++
14 files changed, 399 insertions(+), 43 deletions(-)
diff --git a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
index e4dbc627480..12d84000304 100644
--- a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
+++ b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md
@@ -415,6 +415,11 @@ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent]
}
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+目前在python中不支持该api
+```
+{{< /tab >}}
{{< /tabs >}}
<a name="writing-a-punctuated-watermarkgenerator"></a>
@@ -460,6 +465,11 @@ class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
}
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Python API 中尚不支持该特性。
+```
+{{< /tab >}}
{{< /tabs >}}
{{< hint warning >}}
@@ -499,6 +509,17 @@ kafkaSource.assignTimestampsAndWatermarks(
val stream: DataStream[MyType] = env.addSource(kafkaSource)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+kafka_source = FlinkKafkaConsumer("timer-stream-source", schema, props)
+
+stream = env
+ .add_source(kafka_source)
+ .assign_timestamps_and_watermarks(
+ WatermarkStrategy
+ .for_bounded_out_of_orderness(Duration.of_seconds(20)))
+```
+{{< /tab >}}
{{< /tabs >}}
{{< img src="/fig/parallel_kafka_watermarks.svg" alt="Generating Watermarks with awareness for Kafka-partitions" class="center" width="80%" >}}
diff --git a/docs/content.zh/docs/dev/datastream/execution/execution_configuration.md b/docs/content.zh/docs/dev/datastream/execution/execution_configuration.md
index 086847db5e4..46ebc3bde1f 100644
--- a/docs/content.zh/docs/dev/datastream/execution/execution_configuration.md
+++ b/docs/content.zh/docs/dev/datastream/execution/execution_configuration.md
@@ -43,6 +43,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
var executionConfig = env.getConfig
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+execution_config = env.get_config()
+```
+{{< /tab >}}
{{< /tabs >}}
以下是可用的配置选项:(默认为粗体)
diff --git a/docs/content.zh/docs/dev/datastream/execution/parallel.md b/docs/content.zh/docs/dev/datastream/execution/parallel.md
index d30495c0eb8..4999c47d858 100644
--- a/docs/content.zh/docs/dev/datastream/execution/parallel.md
+++ b/docs/content.zh/docs/dev/datastream/execution/parallel.md
@@ -70,6 +70,24 @@ val wordCounts = text
.sum(1).setParallelism(5)
wordCounts.print()
+env.execute("Word Count Example")
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+
+text = [...]
+word_counts = text
+ .flat_map(lambda x: x.split(" ")) \
+ .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
+ .key_by(lambda i: i[0]) \
+ .window(TumblingEventTimeWindows.of(Time.seconds(5))) \
+ .reduce(lambda i, j: (i[0], i[1] + j[1])) \
+ .set_parallelism(5)
+word_counts.print()
+
+
env.execute("Word Count Example")
```
{{< /tab >}}
@@ -107,6 +125,24 @@ val wordCounts = text
.sum(1)
wordCounts.print()
+env.execute("Word Count Example")
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_parallelism(3)
+
+text = [...]
+word_counts = text
+ .flat_map(lambda x: x.split(" ")) \
+ .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
+ .key_by(lambda i: i[0]) \
+ .window(TumblingEventTimeWindows.of(Time.seconds(5))) \
+ .reduce(lambda i, j: (i[0], i[1] + j[1]))
+word_counts.print()
+
+
env.execute("Word Count Example")
```
{{< /tab >}}
@@ -160,6 +196,11 @@ try {
}
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Python API 中尚不支持该特性。
+```
+{{< /tab >}}
{{< /tabs >}}
diff --git a/docs/content.zh/docs/dev/table/tableApi.md b/docs/content.zh/docs/dev/table/tableApi.md
index e5961db83d5..5a440b96770 100644
--- a/docs/content.zh/docs/dev/table/tableApi.md
+++ b/docs/content.zh/docs/dev/table/tableApi.md
@@ -1180,11 +1180,13 @@ val right = tableEnv.from("orders2")
left.union(right)
```
{{< /tab >}}
-{{< tab >}}
-left = tableEnv.from_path("orders1")
-right = tableEnv.from_path("orders2")
+{{< tab "Python" >}}
+```python
+left = t_env.from_path("orders1")
+right = t_env.from_path("orders2")
left.union(right)
+```
{{< /tab >}}
{{< /tabs >}}
@@ -1213,11 +1215,13 @@ val right = tableEnv.from("orders2")
left.unionAll(right)
```
{{< /tab >}}
-{{< tab >}}
-left = tableEnv.from_path("orders1")
-right = tableEnv.from_path("orders2")
+{{< tab "Python" >}}
+```python
+left = t_env.from_path("orders1")
+right = t_env.from_path("orders2")
-left.unionAll(right)
+left.union_all(right)
+```
{{< /tab >}}
{{< /tabs >}}
@@ -1244,11 +1248,13 @@ val right = tableEnv.from("orders2")
left.intersect(right)
```
{{< /tab >}}
-{{< tab >}}
-left = tableEnv.from_path("orders1")
-right = tableEnv.from_path("orders2")
+{{< tab "Python" >}}
+```python
+left = t_env.from_path("orders1")
+right = t_env.from_path("orders2")
left.intersect(right)
+```
{{< /tab >}}
{{< /tabs >}}
@@ -1275,11 +1281,13 @@ val right = tableEnv.from("orders2")
left.intersectAll(right)
```
{{< /tab >}}
-{{< tab >}}
-left = tableEnv.from_path("orders1")
-right = tableEnv.from_path("orders2")
+{{< tab "Python" >}}
+```python
+left = t_env.from_path("orders1")
+right = t_env.from_path("orders2")
-left.intersectAll(right)
+left.intersect_all(right)
+```
{{< /tab >}}
{{< /tabs >}}
@@ -1306,11 +1314,13 @@ val right = tableEnv.from("orders2")
left.minus(right)
```
{{< /tab >}}
-{{< tab >}}
-left = tableEnv.from_path("orders1")
-right = tableEnv.from_path("orders2")
+{{< tab "Python" >}}
+```python
+left = t_env.from_path("orders1")
+right = t_env.from_path("orders2")
left.minus(right)
+```
{{< /tab >}}
{{< /tabs >}}
@@ -1336,11 +1346,13 @@ val right = tableEnv.from("orders2")
left.minusAll(right)
```
{{< /tab >}}
-{{< tab >}}
-left = tableEnv.from_path("orders1")
-right = tableEnv.from_path("orders2")
+{{< tab "Python" >}}
+```python
+left = t_env.from_path("orders1")
+right = t_env.from_path("orders2")
-left.minusAll(right)
+left.minus_all(right)
+```
{{< /tab >}}
{{< /tabs >}}
diff --git a/docs/content.zh/docs/dev/table/timezone.md b/docs/content.zh/docs/dev/table/timezone.md
index 7f8fce6b24b..7072e24f492 100644
--- a/docs/content.zh/docs/dev/table/timezone.md
+++ b/docs/content.zh/docs/dev/table/timezone.md
@@ -115,6 +115,21 @@ tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
tEnv.getConfig.setLocalTimeZone(ZoneId.of("America/Los_Angeles"))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env_setting = EnvironmentSettings.in_streaming_mode()
+t_env = TableEnvironment.create(env_setting)
+
+# set to UTC time zone
+t_env.get_config().set_local_timezone("UTC")
+
+# set to Shanghai time zone
+t_env.get_config().set_local_timezone("Asia/Shanghai")
+
+# set to Los_Angeles time zone
+t_env.get_config().set_local_timezone("America/Los_Angeles")
+```
+{{< /tab >}}
{{< /tabs >}}
session(会话)的时区设置在 Flink SQL 中非常有用, 它的主要用法如下:
diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md
index fdd4465c505..13bcdebbd1c 100644
--- a/docs/content.zh/docs/ops/state/state_backends.md
+++ b/docs/content.zh/docs/ops/state/state_backends.md
@@ -136,6 +136,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new HashMapStateBackend())
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_state_backend(HashMapStateBackend())
+```
+{{< /tab >}}
{{< /tabs >}}
如果你想在 IDE 中使用 `EmbeddedRocksDBStateBackend`,或者需要在作业中通过编程方式动态配置它,必须添加以下依赖到 Flink 项目中。
@@ -292,6 +298,8 @@ RocksDB State Backend 会将 [这里定义]({{< ref "docs/deployment/config" >}}
下面是自定义 `ConfigurableRocksDBOptionsFactory` 的一个示例 (开发完成后,请将您的实现类全名设置到 `state.backend.rocksdb.options-factory`).
+{{< tabs "6e6f1fd6-fcc6-4af4-929f-97dc7d639ef9" >}}
+{{< tab "Java" >}}
```java
public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
public static final ConfigOption<Integer> BLOCK_RESTART_INTERVAL = ConfigOptions
@@ -326,7 +334,13 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
}
}
```
-
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Python API 中尚不支持该特性。
+```
+{{< /tab >}}
+{{< /tabs >}}
{{< top >}}
<a name="enabling-changelog"></a>
@@ -496,6 +510,13 @@ env.setStateBackend(new HashMapStateBackend)
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_state_backend(HashMapStateBackend())
+env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage())
+```
+{{< /tab >}}
{{< /tabs>}}
### FsStateBackend
@@ -540,6 +561,18 @@ env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir")
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_state_backend(HashMapStateBackend())
+env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir")
+
+
+# Advanced FsStateBackend configurations, such as write buffer size
+# can be set by manually instantiating a FileSystemCheckpointStorage object.
+env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("file:///checkpoint-dir"))
+```
+{{< /tab >}}
{{< /tabs>}}
### RocksDBStateBackend
@@ -586,4 +619,17 @@ env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir")
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_state_backend(EmbeddedRocksDBStateBackend())
+env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir")
+
+
+# If you manually passed FsStateBackend into the RocksDBStateBackend constructor
+# to specify advanced checkpointing configurations such as write buffer size,
+# you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
+env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("file:///checkpoint-dir"))
+```
+{{< /tab >}}
{{< /tabs>}}
diff --git a/docs/content.zh/docs/ops/state/task_failure_recovery.md b/docs/content.zh/docs/ops/state/task_failure_recovery.md
index 0057ee0ec81..f3a7d0a8f80 100644
--- a/docs/content.zh/docs/ops/state/task_failure_recovery.md
+++ b/docs/content.zh/docs/ops/state/task_failure_recovery.md
@@ -74,6 +74,15 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
+ 3, # 尝试重启的次数
+ 10000 # 延时(毫秒)
+))
+```
+{{< /tab >}}
{{< /tabs >}}
@@ -122,6 +131,15 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
+ 3, # 尝试重启的次数
+ 10000 # 延时(毫秒)
+))
+```
+{{< /tab >}}
{{< /tabs >}}
@@ -169,6 +187,16 @@ env.setRestartStrategy(RestartStrategies.failureRateRestart(
))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_restart_strategy(RestartStrategies.failure_rate_restart(
+ 3, # 每个时间间隔的最大故障次数
+ 300000, # 测量故障率的时间间隔
+ 10000 # 延时(毫秒)
+))
+```
+{{< /tab >}}
{{< /tabs >}}
@@ -195,6 +223,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_restart_strategy(RestartStrategies.no_restart())
+```
+{{< /tab >}}
{{< /tabs >}}
### Fallback Restart Strategy
diff --git a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
index 2cc5ed1d4d6..a5d82970bd0 100644
--- a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
+++ b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
@@ -471,6 +471,11 @@ class TimeLagWatermarkGenerator extends WatermarkGenerator[MyEvent] {
}
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
{{< /tabs >}}
### Writing a Punctuated WatermarkGenerator
@@ -517,6 +522,11 @@ class PunctuatedAssigner extends WatermarkGenerator[MyEvent] {
}
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
{{< /tabs >}}
{{< hint warning >}}
@@ -572,6 +582,17 @@ kafkaSource.assignTimestampsAndWatermarks(
val stream: DataStream[MyType] = env.addSource(kafkaSource)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+kafka_source = FlinkKafkaConsumer("timer-stream-source", schema, props)
+
+stream = env
+ .add_source(kafka_source)
+ .assign_timestamps_and_watermarks(
+ WatermarkStrategy
+ .for_bounded_out_of_orderness(Duration.of_seconds(20)))
+```
+{{< /tab >}}
{{< /tabs >}}
{{< img src="/fig/parallel_kafka_watermarks.svg" alt="Generating Watermarks with awareness for Kafka-partitions" class="center" width="80%" >}}
diff --git a/docs/content/docs/dev/datastream/execution/execution_configuration.md b/docs/content/docs/dev/datastream/execution/execution_configuration.md
index 219651ade6f..c9a1a36469e 100644
--- a/docs/content/docs/dev/datastream/execution/execution_configuration.md
+++ b/docs/content/docs/dev/datastream/execution/execution_configuration.md
@@ -43,6 +43,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
var executionConfig = env.getConfig
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+execution_config = env.get_config()
+```
+{{< /tab >}}
{{< /tabs >}}
The following configuration options are available: (the default is bold)
diff --git a/docs/content/docs/dev/datastream/execution/parallel.md b/docs/content/docs/dev/datastream/execution/parallel.md
index 2fbe5c0377a..8125ec6b95f 100644
--- a/docs/content/docs/dev/datastream/execution/parallel.md
+++ b/docs/content/docs/dev/datastream/execution/parallel.md
@@ -76,6 +76,24 @@ val wordCounts = text
.sum(1).setParallelism(5)
wordCounts.print()
+env.execute("Word Count Example")
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+
+text = [...]
+word_counts = text
+ .flat_map(lambda x: x.split(" ")) \
+ .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
+ .key_by(lambda i: i[0]) \
+ .window(TumblingEventTimeWindows.of(Time.seconds(5))) \
+ .reduce(lambda i, j: (i[0], i[1] + j[1])) \
+ .set_parallelism(5)
+word_counts.print()
+
+
env.execute("Word Count Example")
```
{{< /tab >}}
@@ -119,6 +137,24 @@ val wordCounts = text
.sum(1)
wordCounts.print()
+env.execute("Word Count Example")
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_parallelism(3)
+
+text = [...]
+word_counts = text
+ .flat_map(lambda x: x.split(" ")) \
+ .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
+ .key_by(lambda i: i[0]) \
+ .window(TumblingEventTimeWindows.of(Time.seconds(5))) \
+ .reduce(lambda i, j: (i[0], i[1] + j[1]))
+word_counts.print()
+
+
env.execute("Word Count Example")
```
{{< /tab >}}
@@ -175,6 +211,11 @@ try {
}
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
{{< /tabs >}}
diff --git a/docs/content/docs/dev/table/tableApi.md b/docs/content/docs/dev/table/tableApi.md
index 08e3223b9e9..973b07f0b29 100644
--- a/docs/content/docs/dev/table/tableApi.md
+++ b/docs/content/docs/dev/table/tableApi.md
@@ -1179,11 +1179,13 @@ val right = tableEnv.from("orders2")
left.union(right)
```
{{< /tab >}}
-{{< tab >}}
-left = tableEnv.from_path("orders1")
-right = tableEnv.from_path("orders2")
+{{< tab "Python" >}}
+```python
+left = t_env.from_path("orders1")
+right = t_env.from_path("orders2")
left.union(right)
+```
{{< /tab >}}
{{< /tabs >}}
@@ -1212,11 +1214,13 @@ val right = tableEnv.from("orders2")
left.unionAll(right)
```
{{< /tab >}}
-{{< tab >}}
-left = tableEnv.from_path("orders1")
-right = tableEnv.from_path("orders2")
+{{< tab "Python" >}}
+```python
+left = t_env.from_path("orders1")
+right = t_env.from_path("orders2")
-left.unionAll(right)
+left.union_all(right)
+```
{{< /tab >}}
{{< /tabs >}}
@@ -1243,11 +1247,13 @@ val right = tableEnv.from("orders2")
left.intersect(right)
```
{{< /tab >}}
-{{< tab >}}
-left = tableEnv.from_path("orders1")
-right = tableEnv.from_path("orders2")
+{{< tab "Python" >}}
+```python
+left = t_env.from_path("orders1")
+right = t_env.from_path("orders2")
left.intersect(right)
+```
{{< /tab >}}
{{< /tabs >}}
@@ -1274,11 +1280,13 @@ val right = tableEnv.from("orders2")
left.intersectAll(right)
```
{{< /tab >}}
-{{< tab >}}
-left = tableEnv.from_path("orders1")
-right = tableEnv.from_path("orders2")
+{{< tab "Python" >}}
+```python
+left = t_env.from_path("orders1")
+right = t_env.from_path("orders2")
-left.intersectAll(right)
+left.intersect_all(right)
+```
{{< /tab >}}
{{< /tabs >}}
@@ -1305,11 +1313,13 @@ val right = tableEnv.from("orders2")
left.minus(right)
```
{{< /tab >}}
-{{< tab >}}
-left = tableEnv.from_path("orders1")
-right = tableEnv.from_path("orders2")
+{{< tab "Python" >}}
+```python
+left = t_env.from_path("orders1")
+right = t_env.from_path("orders2")
left.minus(right)
+```
{{< /tab >}}
{{< /tabs >}}
@@ -1335,11 +1345,13 @@ val right = tableEnv.from("orders2")
left.minusAll(right)
```
{{< /tab >}}
-{{< tab >}}
-left = tableEnv.from_path("orders1")
-right = tableEnv.from_path("orders2")
+{{< tab "Python" >}}
+```python
+left = t_env.from_path("orders1")
+right = t_env.from_path("orders2")
-left.minusAll(right)
+left.minus_all(right)
+```
{{< /tab >}}
{{< /tabs >}}
diff --git a/docs/content/docs/dev/table/timezone.md b/docs/content/docs/dev/table/timezone.md
index 6d6dc7aec57..7c689ab0101 100644
--- a/docs/content/docs/dev/table/timezone.md
+++ b/docs/content/docs/dev/table/timezone.md
@@ -115,6 +115,21 @@ tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
tEnv.getConfig.setLocalTimeZone(ZoneId.of("America/Los_Angeles"))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env_setting = EnvironmentSettings.in_streaming_mode()
+t_env = TableEnvironment.create(env_setting)
+
+# set to UTC time zone
+t_env.get_config().set_local_timezone("UTC")
+
+# set to Shanghai time zone
+t_env.get_config().set_local_timezone("Asia/Shanghai")
+
+# set to Los_Angeles time zone
+t_env.get_config().set_local_timezone("America/Los_Angeles")
+```
+{{< /tab >}}
{{< /tabs >}}
The session time zone is useful in Flink SQL, the main usages are:
diff --git a/docs/content/docs/ops/state/state_backends.md b/docs/content/docs/ops/state/state_backends.md
index 87cbc514942..5b6834bbf87 100644
--- a/docs/content/docs/ops/state/state_backends.md
+++ b/docs/content/docs/ops/state/state_backends.md
@@ -129,6 +129,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new HashMapStateBackend())
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_state_backend(HashMapStateBackend())
+```
+{{< /tab >}}
{{< /tabs >}}
If you want to use the `EmbeddedRocksDBStateBackend` in your IDE or configure it programmatically in your Flink job, you will have to add the following dependency to your Flink project.
@@ -290,6 +296,8 @@ allocating more memory than configured.
Below is an example how to define a custom ConfigurableOptionsFactory (set class name under `state.backend.rocksdb.options-factory`).
+{{< tabs "6e6f1fd6-fcc6-4af4-929f-97dc7d639eg8" >}}
+{{< tab "Java" >}}
```java
public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
public static final ConfigOption<Integer> BLOCK_RESTART_INTERVAL = ConfigOptions
@@ -324,6 +332,13 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
}
}
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
{{< top >}}
@@ -487,6 +502,13 @@ env.setStateBackend(new HashMapStateBackend)
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_state_backend(HashMapStateBackend())
+env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage())
+```
+{{< /tab >}}
{{< /tabs>}}
### FsStateBackend
@@ -531,6 +553,18 @@ env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir")
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_state_backend(HashMapStateBackend())
+env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir")
+
+
+# Advanced FsStateBackend configurations, such as write buffer size
+# can be set by manually instantiating a FileSystemCheckpointStorage object.
+env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("file:///checkpoint-dir"))
+```
+{{< /tab >}}
{{< /tabs>}}
### RocksDBStateBackend
@@ -577,4 +611,17 @@ env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir")
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_state_backend(EmbeddedRocksDBStateBackend())
+env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir")
+
+
+# If you manually passed FsStateBackend into the RocksDBStateBackend constructor
+# to specify advanced checkpointing configurations such as write buffer size,
+# you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
+env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("file:///checkpoint-dir"))
+```
+{{< /tab >}}
{{< /tabs>}}
diff --git a/docs/content/docs/ops/state/task_failure_recovery.md b/docs/content/docs/ops/state/task_failure_recovery.md
index 340e394ac6f..858264ba56c 100644
--- a/docs/content/docs/ops/state/task_failure_recovery.md
+++ b/docs/content/docs/ops/state/task_failure_recovery.md
@@ -77,6 +77,15 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
+ 3, # number of restart attempts
+ 10000 # delay(millisecond)
+))
+```
+{{< /tab >}}
{{< /tabs >}}
@@ -124,6 +133,15 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
+ 3, # number of restart attempts
+ 10000 # delay(millisecond)
+))
+```
+{{< /tab >}}
{{< /tabs >}}
@@ -179,6 +197,11 @@ env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
{{< /tabs >}}
### Failure Rate Restart Strategy
@@ -223,6 +246,16 @@ env.setRestartStrategy(RestartStrategies.failureRateRestart(
))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_restart_strategy(RestartStrategies.failure_rate_restart(
+ 3, # max failures per interval
+ 300000, # interval for measuring failure rate (millisecond)
+ 10000 # dela(millisecond)
+))
+```
+{{< /tab >}}
{{< /tabs >}}
@@ -249,6 +282,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_restart_strategy(RestartStrategies.no_restart())
+```
+{{< /tab >}}
{{< /tabs >}}
### Fallback Restart Strategy