You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/07/30 06:43:34 UTC

[flink] branch release-1.15 updated: [FLINK-28140][python][docs] Improve the documentation connector pages and metrics pages

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 8db40cecadf [FLINK-28140][python][docs] Improve the documentation connector pages and metrics pages
8db40cecadf is described below

commit 8db40cecadf48bde47e18d64d30651157ea52a48
Author: Mingde Peng <pe...@gmail.com>
AuthorDate: Thu Jul 28 11:41:33 2022 +0800

    [FLINK-28140][python][docs] Improve the documentation connector pages and metrics pages
    
    This closes #20385.
---
 docs/content.zh/docs/connectors/datastream/jdbc.md |  29 ++++++
 .../docs/connectors/datastream/rabbitmq.md         |  24 +++++
 docs/content.zh/docs/dev/datastream/sources.md     |  52 ++++++++++
 .../dev/table/concepts/temporal_table_function.md  |  10 ++
 docs/content.zh/docs/ops/metrics.md                |  97 ++++++++++++++++++
 docs/content/docs/connectors/datastream/jdbc.md    | 113 ++++++++++++++++++++-
 .../content/docs/connectors/datastream/rabbitmq.md |  24 +++++
 docs/content/docs/dev/datastream/sources.md        |  52 ++++++++++
 .../dev/table/concepts/temporal_table_function.md  |  10 ++
 docs/content/docs/ops/metrics.md                   |  89 ++++++++++++++++
 10 files changed, 499 insertions(+), 1 deletion(-)

diff --git a/docs/content.zh/docs/connectors/datastream/jdbc.md b/docs/content.zh/docs/connectors/datastream/jdbc.md
index 31e4f182fcd..55728ea124c 100644
--- a/docs/content.zh/docs/connectors/datastream/jdbc.md
+++ b/docs/content.zh/docs/connectors/datastream/jdbc.md
@@ -38,6 +38,8 @@ under the License.
 更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。
 
 用法示例:
+{{< tabs "4ab65f13-608a-411a-8d24-e303f384ab5d" >}}
+{{< tab "Java" >}}
 ```java
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env
@@ -57,5 +59,32 @@ env
                         .build()));
 env.execute();
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+type_info = Types.ROW([Types.INT(), Types.STRING(), Types.STRING(), Types.INT()])
+env.from_collection(
+    [(101, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
+     (102, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
+     (103, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
+     (104, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
+     ], type_info=type_info) \
+    .add_sink(
+    JdbcSink.sink(
+        "insert into books (id, title, authors, year) values (?, ?, ?, ?)",
+        type_info,
+        JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+            .with_url('jdbc:postgresql://dbhost:5432/postgresdb')
+            .with_driver_name('org.postgresql.Driver')
+            .with_user_name('someUser')
+            .with_password('somePassword')
+            .build()
+    ))
+
+env.execute()
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 更多细节请查看 API documentation 。
diff --git a/docs/content.zh/docs/connectors/datastream/rabbitmq.md b/docs/content.zh/docs/connectors/datastream/rabbitmq.md
index 668987fe6e9..783cca83121 100644
--- a/docs/content.zh/docs/connectors/datastream/rabbitmq.md
+++ b/docs/content.zh/docs/connectors/datastream/rabbitmq.md
@@ -154,6 +154,14 @@ val connectionConfig = new RMQConnectionConfig.Builder()
     .build
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+connection_config = RMQConnectionConfig.Builder() \
+    .set_prefetch_count(30000) \
+    ...
+    .build()
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 RabbitMQ Source 默认情况下是不设置 prefetch count 的,这意味着 RabbitMQ 服务器将会无限制地向 source 发送消息。因此在生产环境中,最好要设置它。当消费海量数据的队列并且启用 checkpointing 时,消息只有在做完 checkpoint 后才会被确认,因此也许需要对 prefetch count 做一些调整来减少不必要的循环。
@@ -197,6 +205,22 @@ stream.addSink(new RMQSink[String](
     new SimpleStringSchema))  // serialization schema to turn Java objects to messages
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+stream = ...
+
+connection_config = RMQConnectionConfig.Builder() \
+    .set_host("localhost") \
+    .set_port(5000) \
+    ...
+    .build()
+
+stream.add_sink(RMQSink(
+    connection_config,      # config for the RabbitMQ connection
+    'queueName',            # name of the RabbitMQ queue to send messages to
+    SimpleStringSchema()))  # serialization schema to turn Java objects to messages
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 更多关于 RabbitMQ 的信息请参考 [这里](http://www.rabbitmq.com/).
diff --git a/docs/content.zh/docs/dev/datastream/sources.md b/docs/content.zh/docs/dev/datastream/sources.md
index b01c631fe01..55eec7691a6 100644
--- a/docs/content.zh/docs/dev/datastream/sources.md
+++ b/docs/content.zh/docs/dev/datastream/sources.md
@@ -126,6 +126,8 @@ SplitEnumerator 被认为是整个 Source 的“大脑”。SplitEnumerator 的
 `SplitEnumerator` 的实现可以仅采用被动工作方式,即仅在其方法被调用时采取协调操作,但是一些 `SplitEnumerator` 的实现会采取主动性的工作方式。例如,`SplitEnumerator` 定期寻找分片并分配给 `SourceReader`。
 这类问题使用 `SplitEnumeratorContext` 类中的 `callAsync()` 方法比较方便。下面的代码片段展示了如何在 `SplitEnumerator` 不需要自己维护线程的条件下实现这一点。
 
+{{< tabs "066b6695-5bc3-4d7a-9032-ff6b1d15c3b1" >}}
+{{< tab "Java" >}}
 ```java
 class MySplitEnumerator implements SplitEnumerator<MySplit> {
     private final long DISCOVER_INTERVAL = 60_000L;
@@ -152,6 +154,13 @@ class MySplitEnumerator implements SplitEnumerator<MySplit> {
     ...
 }
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Python API 中尚不支持该特性。
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 <a name="SourceReader"></a>
 
@@ -207,6 +216,18 @@ val stream = env.fromSource(
 ...
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+
+my_source = ...
+
+env.from_source(
+    my_source,
+    WatermarkStrategy.no_watermarks(),
+    "my_source_name")
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 ----
@@ -260,6 +281,8 @@ val stream = env.fromSource(
 
 以下代码片段实现了此线程模型。
 
+{{< tabs "bde5ff60-4e61-4644-a6dc-50524acb7b33" >}}
+{{< tab "Java" >}}
 ```java
 /**
  * 一个SplitFetcherManager,它具有固定数量的分片提取器,
@@ -299,9 +322,18 @@ public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit>
     }
 }
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Python API 中尚不支持该特性。
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 使用这种线程模型的`SourceReader`可以像下面这样创建:
 
+{{< tabs "bde5ff60-4e61-4614-a6dc-50524aca6c31" >}}
+{{< tab "Java" >}}
 ```java
 public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT>
         extends SourceReaderBase<E, T, SplitT, SplitStateT> {
@@ -342,6 +374,13 @@ public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, Spli
     }
 }
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Python API 中尚不支持该特性。
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 `SourceReader` 的实现还可以在 `SplitFetcherManager` 和 `SourceReaderBase` 的基础上编写自己的线程模型。
 
@@ -359,12 +398,25 @@ Source 的实现需要完成一部分*事件时间*分配和*水印生成*的工
 
 在 DataStream API 创建期间, `WatermarkStrategy` 会被传递给 Source,并同时创建 [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) 和 [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java)。
 
+{{< tabs "bde5ff60-4e62-4643-a6dc-50524acb7b34" >}}
+{{< tab "Java" >}}
 ```java
 environment.fromSource(
     Source<OUT, ?, ?> source,
     WatermarkStrategy<OUT> timestampsAndWatermarks,
     String sourceName);
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+environment.from_source(
+    source: Source,
+    watermark_strategy: WatermarkStrategy,
+    source_name: str,
+    type_info: TypeInformation = None) 
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 `TimestampAssigner` 和 `WatermarkGenerator` 作为 `ReaderOutput`(或 `SourceOutput`)的一部分透明地运行,因此 Source 实现者不必实现任何时间戳提取和水印生成的代码。 
 
diff --git a/docs/content.zh/docs/dev/table/concepts/temporal_table_function.md b/docs/content.zh/docs/dev/table/concepts/temporal_table_function.md
index 05e577f68ae..49696e6568f 100644
--- a/docs/content.zh/docs/dev/table/concepts/temporal_table_function.md
+++ b/docs/content.zh/docs/dev/table/concepts/temporal_table_function.md
@@ -76,6 +76,11 @@ rates = tEnv
 tEnv.registerFunction("rates", rates)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python Table API.
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 ## Temporal Table Function Join
@@ -126,6 +131,11 @@ val result = orders
     .select($"(o_amount * r_rate).sum as amount"))
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 {{< top >}}
diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md
index 072b2fd0084..e13d83056c4 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -85,6 +85,23 @@ class MyMapper extends RichMapFunction[String,String] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+class MyMapper(MapFunction):
+    def __init__(self):
+        self.counter = None
+
+    def open(self, runtime_context: RuntimeContext):
+        self.counter = runtime_context \
+            .get_metrics_group() \
+            .counter("my_counter")
+
+    def map(self, value: str):
+        self.counter.inc()
+        return value
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -133,6 +150,11 @@ class MyMapper extends RichMapFunction[String,String] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -189,6 +211,24 @@ new class MyMapper extends RichMapFunction[String,String] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+class MyMapper(MapFunction):
+    def __init__(self):
+        self.value_to_expose = 0
+
+    def open(self, runtime_context: RuntimeContext):
+        runtime_context \
+            .get_metrics_group() \
+            .gauge("my_gauge", lambda: self.value_to_expose)
+
+    def map(self, value: str):
+        self.value_to_expose += 1
+        return value
+
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -239,6 +279,11 @@ class MyMapper extends RichMapFunction[Long,Long] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -300,6 +345,11 @@ class MyMapper extends RichMapFunction[Long, Long] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -348,6 +398,25 @@ class MyMapper extends RichMapFunction[Long,Long] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+class MyMapperMeter(MapFunction):
+    def __init__(self):
+        self.meter = None
+
+    def open(self, runtime_context: RuntimeContext):
+        # an average rate of events per second over 120s, default is 60s.
+        self.meter = runtime_context \
+            .get_metrics_group() \
+            .meter("my_meter", time_span_in_seconds=120)
+
+    def map(self, value: str):
+        self.meter.markEvent()
+        return value
+
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -407,6 +476,11 @@ class MyMapper extends RichMapFunction[Long,Long] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -454,6 +528,21 @@ counter = getRuntimeContext()
   .addGroup("MyMetricsKey", "MyMetricsValue")
   .counter("myCounter")
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+counter = runtime_context \
+    .get_metric_group() \
+    .add_group("my_metrics") \
+    .counter("my_counter")
+
+counter = runtime_context \
+    .get_metric_group() \
+    .add_group("my_metrics_key", "my_metrics_value") \
+    .counter("my_counter")
+
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -536,6 +625,14 @@ counter = getRuntimeContext()
 
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+counter = runtime_context
+    .get_metric_group() \
+    .add_group("my_metrics_key", "my_metrics_value") \
+    .counter("my_counter")
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 ## Reporter
diff --git a/docs/content/docs/connectors/datastream/jdbc.md b/docs/content/docs/connectors/datastream/jdbc.md
index 85c67235b0c..4610e393d8e 100644
--- a/docs/content/docs/connectors/datastream/jdbc.md
+++ b/docs/content/docs/connectors/datastream/jdbc.md
@@ -41,6 +41,8 @@ The JDBC sink provides at-least-once guarantee.
 Effectively though, exactly-once can be achieved by crafting upsert SQL statements or idempotent SQL updates.
 Configuration goes as follow (see also {{< javadoc file="org/apache/flink/connector/jdbc/JdbcSink.html" name="JdbcSink javadoc" >}}).
 
+{{< tabs "4ab65f13-607a-411a-8d24-e709f701cd4c" >}}
+{{< tab "Java" >}}
 ```java
 JdbcSink.sink(
       	sqlDmlStatement,                       // mandatory
@@ -48,7 +50,19 @@ JdbcSink.sink(
       	jdbcExecutionOptions,                  // optional
       	jdbcConnectionOptions                  // mandatory
 );
-```        	
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+JdbcSink.sink(
+    sql_dml_statement,          # mandatory
+    type_info,                  # mandatory
+    jdbc_connection_options,    # mandatory
+    jdbc_execution_options      # optional
+)
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 ### SQL DML statement and JDBC statement builder
 
@@ -68,6 +82,8 @@ It then repeatedly calls a user-provided function to update that prepared statem
 
 The SQL DML statements are executed in batches, which can optionally be configured with the following instance (see also {{< javadoc name="JdbcExecutionOptions javadoc" file="org/apache/flink/connector/jdbc/JdbcExecutionOptions.html" >}})
 
+{{< tabs "4ab65f13-607a-411a-8d24-e709f512ed6k" >}}
+{{< tab "Java" >}}
 ```java
 JdbcExecutionOptions.builder()
         .withBatchIntervalMs(200)             // optional: default = 0, meaning no time-based execution is done
@@ -75,6 +91,17 @@ JdbcExecutionOptions.builder()
         .withMaxRetries(5)                    // optional: default = 3 
 .build();
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+JdbcExecutionOptions.builder() \
+    .with_batch_interval_ms(2000) \
+    .with_batch_size(100) \
+    .with_max_retries(5) \
+    .build()
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 A JDBC batch is executed as soon as one of the following conditions is true:
 
@@ -89,6 +116,8 @@ Please see {{< javadoc name="JdbcConnectionOptions javadoc" file="org/apache/fli
 
 ### Full example
 
+{{< tabs "4ab65f13-608a-411a-8d24-e303f348ds8d" >}}
+{{< tab "Java" >}}
 ```java
 public class JdbcSinkExample {
 
@@ -139,6 +168,38 @@ public class JdbcSinkExample {
     }
 }
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+type_info = Types.ROW([Types.INT(), Types.STRING(), Types.STRING(), Types.INT()])
+env.from_collection(
+    [(101, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
+     (102, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
+     (103, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
+     (104, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
+     ], type_info=type_info) \
+    .add_sink(
+    JdbcSink.sink(
+        "insert into books (id, title, authors, year) values (?, ?, ?, ?)",
+        type_info,
+        JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+            .with_url('jdbc:postgresql://dbhost:5432/postgresdb')
+            .with_driver_name('org.postgresql.Driver')
+            .with_user_name('someUser')
+            .with_password('somePassword')
+            .build(),
+        JdbcExecutionOptions.builder()
+            .with_batch_interval_ms(1000)
+            .with_batch_size(200)
+            .with_max_retries(5)
+            .build()
+    ))
+
+env.execute()
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 ## `JdbcSink.exactlyOnceSink`
 
@@ -153,6 +214,9 @@ To use it, create a sink using `exactlyOnceSink()` method as above and additiona
 - [XA DataSource](https://docs.oracle.com/javase/8/docs/api/javax/sql/XADataSource.html) Supplier
 
 For example:
+
+{{< tabs "4ab65f13-608a-411a-8d24-e304f627ac8f" >}}
+{{< tab "Java" >}}
 ```java
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env
@@ -179,13 +243,32 @@ env
                 });
 env.execute();
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 **NOTE:** Some databases only allow a single XA transaction per connection (e.g. PostgreSQL, MySQL).
 In such cases, please use the following API to construct `JdbcExactlyOnceOptions`:
+
+{{< tabs "4ab65f13-608a-411a-8d24-e304f627cd4e" >}}
+{{< tab "Java" >}}
 ```java
 JdbcExactlyOnceOptions.builder()
 .withTransactionPerConnection(true)
 .build();
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 This will make Flink use a separate connection for every XA transaction. This may require adjusting connection limits.
 For PostgreSQL and MySQL, this can be done by increasing `max_connections`.
 
@@ -198,28 +281,56 @@ with `JdbcExecutionOptions.maxRetries == 0`; otherwise, duplicated results maybe
 
 ### `XADataSource` examples
 PostgreSQL `XADataSource` example:
+{{< tabs "4ab65f13-608a-411a-8d24-e304f323ab3a" >}}
+{{< tab "Java" >}}
 ```java
 PGXADataSource xaDataSource = new org.postgresql.xa.PGXADataSource();
 xaDataSource.setUrl("jdbc:postgresql://localhost:5432/postgres");
 xaDataSource.setUser(username);
 xaDataSource.setPassword(password);
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 MySQL `XADataSource` example:
+{{< tabs "4ab65f13-608a-411a-8d24-b213f323ca3c" >}}
+{{< tab "Java" >}}
 ```java
 MysqlXADataSource xaDataSource = new com.mysql.cj.jdbc.MysqlXADataSource();
 xaDataSource.setUrl("jdbc:mysql://localhost:3306/");
 xaDataSource.setUser(username);
 xaDataSource.setPassword(password);
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 Oracle `XADataSource` example:
+{{< tabs "4ab65f13-608a-411a-8d24-b213f337ad5f" >}}
+{{< tab "Java" >}}
 ```java
 OracleXADataSource xaDataSource = new oracle.jdbc.xa.OracleXADataSource();
 xaDataSource.setURL("jdbc:oracle:oci8:@");
 xaDataSource.setUser("scott");
 xaDataSource.setPassword("tiger");
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 Please also take Oracle connection pooling into account.
 
 Please refer to the `JdbcXaSinkFunction` documentation for more details.
diff --git a/docs/content/docs/connectors/datastream/rabbitmq.md b/docs/content/docs/connectors/datastream/rabbitmq.md
index 543a8fe327b..3d8678aee2f 100644
--- a/docs/content/docs/connectors/datastream/rabbitmq.md
+++ b/docs/content/docs/connectors/datastream/rabbitmq.md
@@ -175,6 +175,14 @@ val connectionConfig = new RMQConnectionConfig.Builder()
     .build
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+connection_config = RMQConnectionConfig.Builder() \
+    .set_prefetch_count(30000) \
+    ...
+    .build()
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 The prefetch count is unset by default, meaning the RabbitMQ server will send unlimited messages. In production, it
@@ -221,6 +229,22 @@ stream.addSink(new RMQSink[String](
     new SimpleStringSchema))  // serialization schema to turn Java objects to messages
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+stream = ...
+
+connection_config = RMQConnectionConfig.Builder() \
+    .set_host("localhost") \
+    .set_port(5000) \
+    ...
+    .build()
+
+stream.add_sink(RMQSink(
+    connection_config,      # config for the RabbitMQ connection
+    'queueName',            # name of the RabbitMQ queue to send messages to
+    SimpleStringSchema()))  # serialization schema to turn Java objects to messages
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 More about RabbitMQ can be found [here](http://www.rabbitmq.com/).
diff --git a/docs/content/docs/dev/datastream/sources.md b/docs/content/docs/dev/datastream/sources.md
index 2a9265ab453..185425265bc 100644
--- a/docs/content/docs/dev/datastream/sources.md
+++ b/docs/content/docs/dev/datastream/sources.md
@@ -117,6 +117,8 @@ The `Source` implementation is expected to pass the `SplitEnumeratorContext` to
 While a `SplitEnumerator` implementation can work well in a reactive way by only taking coordination actions when its method is invoked, some `SplitEnumerator` implementations might want to take actions actively. For example, a `SplitEnumerator` may want to periodically run split discovery and assign the new splits to the `SourceReaders`. 
 Such implementations may find that the `callAsync()` method `SplitEnumeratorContext` is handy. The code snippet below shows how the `SplitEnumerator` implementation can achieve that without maintaining its own threads.
 
+{{< tabs "066b6695-5bc3-4d7a-9033-ff6b1d15c3b1" >}}
+{{< tab "Java" >}}
 ```java
 class MySplitEnumerator implements SplitEnumerator<MySplit> {
     private final long DISCOVER_INTERVAL = 60_000L;
@@ -143,6 +145,13 @@ class MySplitEnumerator implements SplitEnumerator<MySplit> {
     ...
 }
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 ### SourceReader
 
@@ -194,6 +203,18 @@ val stream = env.fromSource(
 ...
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+
+my_source = ...
+
+env.from_source(
+    my_source,
+    WatermarkStrategy.no_watermarks(),
+    "my_source_name")
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 ----
@@ -240,6 +261,8 @@ As an example, as illustrated below, a `SplitFetcherManager` may have a fixed nu
 
 The following code snippet implements this threading model.
 
+{{< tabs "bde5ff60-4e61-4633-a6dc-50524acb7b33" >}}
+{{< tab "Java" >}}
 ```java
 /**
  * A SplitFetcherManager that has a fixed size of split fetchers and assign splits 
@@ -279,9 +302,18 @@ public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit>
     }
 }
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 And a `SourceReader` using this threading model can be created like following:
 
+{{< tabs "bde5ff60-4e61-4633-a6dc-50524aca6c31" >}}
+{{< tab "Java" >}}
 ```java
 public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT>
         extends SourceReaderBase<E, T, SplitT, SplitStateT> {
@@ -322,6 +354,13 @@ public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, Spli
     }
 }
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 The `SourceReader` implementations can also implement their own threading model easily on top of the `SplitFetcherManager` and `SourceReaderBase`.
 
@@ -337,12 +376,25 @@ Applications based on the legacy [SourceFunction](https://github.com/apache/flin
 
 The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).
 
+{{< tabs "bde5ff60-4e61-4633-a6dc-50524acb7b36" >}}
+{{< tab "Java" >}}
 ```java
 environment.fromSource(
     Source<OUT, ?, ?> source,
     WatermarkStrategy<OUT> timestampsAndWatermarks,
     String sourceName);
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+environment.from_source(
+    source: Source,
+    watermark_strategy: WatermarkStrategy,
+    source_name: str,
+    type_info: TypeInformation = None) 
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 The `TimestampAssigner` and `WatermarkGenerator` run transparently as part of the `ReaderOutput`(or `SourceOutput`) so source implementors do not have to implement any timestamp extraction and watermark generation code.
 
diff --git a/docs/content/docs/dev/table/concepts/temporal_table_function.md b/docs/content/docs/dev/table/concepts/temporal_table_function.md
index ac6665d81c3..000ae122854 100644
--- a/docs/content/docs/dev/table/concepts/temporal_table_function.md
+++ b/docs/content/docs/dev/table/concepts/temporal_table_function.md
@@ -76,6 +76,11 @@ rates = tEnv
 tEnv.registerFunction("rates", rates)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 ## Temporal Table Function Join
@@ -126,6 +131,11 @@ val result = orders
     .select($"(o_amount * r_rate).sum as amount"))
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 {{< top >}}
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 3a574e39995..9ef43ec1707 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -85,6 +85,23 @@ class MyMapper extends RichMapFunction[String,String] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+class MyMapper(MapFunction):
+    def __init__(self):
+        self.counter = None
+
+    def open(self, runtime_context: RuntimeContext):
+        self.counter = runtime_context \
+            .get_metrics_group() \
+            .counter("my_counter")
+
+    def map(self, value: str):
+        self.counter.inc()
+        return value
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -133,6 +150,11 @@ class MyMapper extends RichMapFunction[String,String] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -189,6 +211,24 @@ new class MyMapper extends RichMapFunction[String,String] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+class MyMapper(MapFunction):
+    def __init__(self):
+        self.value_to_expose = 0
+
+    def open(self, runtime_context: RuntimeContext):
+        runtime_context \
+            .get_metrics_group() \
+            .gauge("my_gauge", lambda: self.value_to_expose)
+
+    def map(self, value: str):
+        self.value_to_expose += 1
+        return value
+
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -239,6 +279,11 @@ class MyMapper extends RichMapFunction[Long,Long] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -300,6 +345,11 @@ class MyMapper extends RichMapFunction[Long, Long] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -348,6 +398,25 @@ class MyMapper extends RichMapFunction[Long,Long] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+class MyMapperMeter(MapFunction):
+    def __init__(self):
+        self.meter = None
+
+    def open(self, runtime_context: RuntimeContext):
+        # an average rate of events per second over 120s, default is 60s.
+        self.meter = runtime_context
+            .get_metrics_group()
+            .meter("my_meter", time_span_in_seconds=120)
+
+    def map(self, value: str):
+        self.meter.markEvent()
+        return value
+
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -407,6 +476,11 @@ class MyMapper extends RichMapFunction[Long,Long] {
   }
 }
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -454,6 +528,21 @@ counter = getRuntimeContext()
   .addGroup("MyMetricsKey", "MyMetricsValue")
   .counter("myCounter")
 
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+counter = runtime_context \
+    .get_metric_group() \
+    .add_group("my_metrics") \
+    .counter("my_counter")
+
+counter = runtime_context \
+    .get_metric_group() \
+    .add_group("my_metrics_key", "my_metrics_value") \
+    .counter("my_counter")
+
 ```
 {{< /tab >}}
 {{< /tabs >}}