You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/07/30 06:43:34 UTC
[flink] branch release-1.15 updated: [FLINK-28140][python][docs] Improve the documentation connector pages and metrics pages
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 8db40cecadf [FLINK-28140][python][docs] Improve the documentation connector pages and metrics pages
8db40cecadf is described below
commit 8db40cecadf48bde47e18d64d30651157ea52a48
Author: Mingde Peng <pe...@gmail.com>
AuthorDate: Thu Jul 28 11:41:33 2022 +0800
[FLINK-28140][python][docs] Improve the documentation connector pages and metrics pages
This closes #20385.
---
docs/content.zh/docs/connectors/datastream/jdbc.md | 29 ++++++
.../docs/connectors/datastream/rabbitmq.md | 24 +++++
docs/content.zh/docs/dev/datastream/sources.md | 52 ++++++++++
.../dev/table/concepts/temporal_table_function.md | 10 ++
docs/content.zh/docs/ops/metrics.md | 97 ++++++++++++++++++
docs/content/docs/connectors/datastream/jdbc.md | 113 ++++++++++++++++++++-
.../content/docs/connectors/datastream/rabbitmq.md | 24 +++++
docs/content/docs/dev/datastream/sources.md | 52 ++++++++++
.../dev/table/concepts/temporal_table_function.md | 10 ++
docs/content/docs/ops/metrics.md | 89 ++++++++++++++++
10 files changed, 499 insertions(+), 1 deletion(-)
diff --git a/docs/content.zh/docs/connectors/datastream/jdbc.md b/docs/content.zh/docs/connectors/datastream/jdbc.md
index 31e4f182fcd..55728ea124c 100644
--- a/docs/content.zh/docs/connectors/datastream/jdbc.md
+++ b/docs/content.zh/docs/connectors/datastream/jdbc.md
@@ -38,6 +38,8 @@ under the License.
更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。
用法示例:
+{{< tabs "4ab65f13-608a-411a-8d24-e303f384ab5d" >}}
+{{< tab "Java" >}}
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
@@ -57,5 +59,32 @@ env
.build()));
env.execute();
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+type_info = Types.ROW([Types.INT(), Types.STRING(), Types.STRING(), Types.INT()])
+env.from_collection(
+ [(101, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
+ (102, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
+ (103, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
+ (104, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
+ ], type_info=type_info) \
+ .add_sink(
+ JdbcSink.sink(
+ "insert into books (id, title, authors, year) values (?, ?, ?, ?)",
+ type_info,
+ JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+ .with_url('jdbc:postgresql://dbhost:5432/postgresdb')
+ .with_driver_name('org.postgresql.Driver')
+ .with_user_name('someUser')
+ .with_password('somePassword')
+ .build()
+ ))
+
+env.execute()
+```
+{{< /tab >}}
+{{< /tabs >}}
更多细节请查看 API documentation 。
diff --git a/docs/content.zh/docs/connectors/datastream/rabbitmq.md b/docs/content.zh/docs/connectors/datastream/rabbitmq.md
index 668987fe6e9..783cca83121 100644
--- a/docs/content.zh/docs/connectors/datastream/rabbitmq.md
+++ b/docs/content.zh/docs/connectors/datastream/rabbitmq.md
@@ -154,6 +154,14 @@ val connectionConfig = new RMQConnectionConfig.Builder()
.build
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+connection_config = RMQConnectionConfig.Builder() \
+ .set_prefetch_count(30000) \
+ ...
+ .build()
+```
+{{< /tab >}}
{{< /tabs >}}
RabbitMQ Source 默认情况下是不设置 prefetch count 的,这意味着 RabbitMQ 服务器将会无限制地向 source 发送消息。因此在生产环境中,最好要设置它。当消费海量数据的队列并且启用 checkpointing 时,消息只有在做完 checkpoint 后才会被确认,因此也许需要对 prefetch count 做一些调整来减少不必要的循环。
@@ -197,6 +205,22 @@ stream.addSink(new RMQSink[String](
new SimpleStringSchema)) // serialization schema to turn Java objects to messages
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+stream = ...
+
+connection_config = RMQConnectionConfig.Builder() \
+ .set_host("localhost") \
+ .set_port(5000) \
+ ...
+ .build()
+
+stream.add_sink(RMQSink(
+ connection_config, # config for the RabbitMQ connection
+ 'queueName', # name of the RabbitMQ queue to send messages to
+ SimpleStringSchema())) # serialization schema to turn Java objects to messages
+```
+{{< /tab >}}
{{< /tabs >}}
更多关于 RabbitMQ 的信息请参考 [这里](http://www.rabbitmq.com/).
diff --git a/docs/content.zh/docs/dev/datastream/sources.md b/docs/content.zh/docs/dev/datastream/sources.md
index b01c631fe01..55eec7691a6 100644
--- a/docs/content.zh/docs/dev/datastream/sources.md
+++ b/docs/content.zh/docs/dev/datastream/sources.md
@@ -126,6 +126,8 @@ SplitEnumerator 被认为是整个 Source 的“大脑”。SplitEnumerator 的
`SplitEnumerator` 的实现可以仅采用被动工作方式,即仅在其方法被调用时采取协调操作,但是一些 `SplitEnumerator` 的实现会采取主动性的工作方式。例如,`SplitEnumerator` 定期寻找分片并分配给 `SourceReader`。
这类问题使用 `SplitEnumeratorContext` 类中的 `callAsync()` 方法比较方便。下面的代码片段展示了如何在 `SplitEnumerator` 不需要自己维护线程的条件下实现这一点。
+{{< tabs "066b6695-5bc3-4d7a-9032-ff6b1d15c3b1" >}}
+{{< tab "Java" >}}
```java
class MySplitEnumerator implements SplitEnumerator<MySplit> {
private final long DISCOVER_INTERVAL = 60_000L;
@@ -152,6 +154,13 @@ class MySplitEnumerator implements SplitEnumerator<MySplit> {
...
}
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Python API 中尚不支持该特性。
+```
+{{< /tab >}}
+{{< /tabs >}}
<a name="SourceReader"></a>
@@ -207,6 +216,18 @@ val stream = env.fromSource(
...
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+
+my_source = ...
+
+env.from_source(
+ my_source,
+ WatermarkStrategy.no_watermarks(),
+ "my_source_name")
+```
+{{< /tab >}}
{{< /tabs >}}
----
@@ -260,6 +281,8 @@ val stream = env.fromSource(
以下代码片段实现了此线程模型。
+{{< tabs "bde5ff60-4e61-4644-a6dc-50524acb7b33" >}}
+{{< tab "Java" >}}
```java
/**
* 一个SplitFetcherManager,它具有固定数量的分片提取器,
@@ -299,9 +322,18 @@ public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit>
}
}
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Python API 中尚不支持该特性。
+```
+{{< /tab >}}
+{{< /tabs >}}
使用这种线程模型的`SourceReader`可以像下面这样创建:
+{{< tabs "bde5ff60-4e61-4614-a6dc-50524aca6c31" >}}
+{{< tab "Java" >}}
```java
public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT>
extends SourceReaderBase<E, T, SplitT, SplitStateT> {
@@ -342,6 +374,13 @@ public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, Spli
}
}
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Python API 中尚不支持该特性。
+```
+{{< /tab >}}
+{{< /tabs >}}
`SourceReader` 的实现还可以在 `SplitFetcherManager` 和 `SourceReaderBase` 的基础上编写自己的线程模型。
@@ -359,12 +398,25 @@ Source 的实现需要完成一部分*事件时间*分配和*水印生成*的工
在 DataStream API 创建期间, `WatermarkStrategy` 会被传递给 Source,并同时创建 [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) 和 [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java)。
+{{< tabs "bde5ff60-4e62-4643-a6dc-50524acb7b34" >}}
+{{< tab "Java" >}}
```java
environment.fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName);
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+environment.from_source(
+ source: Source,
+ watermark_strategy: WatermarkStrategy,
+ source_name: str,
+ type_info: TypeInformation = None)
+```
+{{< /tab >}}
+{{< /tabs >}}
`TimestampAssigner` 和 `WatermarkGenerator` 作为 `ReaderOutput`(或 `SourceOutput`)的一部分透明地运行,因此 Source 实现者不必实现任何时间戳提取和水印生成的代码。
diff --git a/docs/content.zh/docs/dev/table/concepts/temporal_table_function.md b/docs/content.zh/docs/dev/table/concepts/temporal_table_function.md
index 05e577f68ae..49696e6568f 100644
--- a/docs/content.zh/docs/dev/table/concepts/temporal_table_function.md
+++ b/docs/content.zh/docs/dev/table/concepts/temporal_table_function.md
@@ -76,6 +76,11 @@ rates = tEnv
tEnv.registerFunction("rates", rates)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python Table API.
+```
+{{< /tab >}}
{{< /tabs >}}
## Temporal Table Function Join
@@ -126,6 +131,11 @@ val result = orders
.select($"(o_amount * r_rate).sum as amount"))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
{{< /tabs >}}
{{< top >}}
diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md
index 072b2fd0084..e13d83056c4 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -85,6 +85,23 @@ class MyMapper extends RichMapFunction[String,String] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+class MyMapper(MapFunction):
+ def __init__(self):
+ self.counter = None
+
+ def open(self, runtime_context: RuntimeContext):
+ self.counter = runtime_context \
+ .get_metrics_group() \
+ .counter("my_counter")
+
+ def map(self, value: str):
+ self.counter.inc()
+ return value
```
{{< /tab >}}
{{< /tabs >}}
@@ -133,6 +150,11 @@ class MyMapper extends RichMapFunction[String,String] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
```
{{< /tab >}}
{{< /tabs >}}
@@ -189,6 +211,24 @@ new class MyMapper extends RichMapFunction[String,String] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+class MyMapper(MapFunction):
+ def __init__(self):
+ self.value_to_expose = 0
+
+ def open(self, runtime_context: RuntimeContext):
+ runtime_context \
+ .get_metrics_group() \
+ .gauge("my_gauge", lambda: self.value_to_expose)
+
+ def map(self, value: str):
+ self.value_to_expose += 1
+ return value
+
```
{{< /tab >}}
{{< /tabs >}}
@@ -239,6 +279,11 @@ class MyMapper extends RichMapFunction[Long,Long] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
```
{{< /tab >}}
{{< /tabs >}}
@@ -300,6 +345,11 @@ class MyMapper extends RichMapFunction[Long, Long] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
```
{{< /tab >}}
{{< /tabs >}}
@@ -348,6 +398,25 @@ class MyMapper extends RichMapFunction[Long,Long] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+class MyMapperMeter(MapFunction):
+ def __init__(self):
+ self.meter = None
+
+ def open(self, runtime_context: RuntimeContext):
+ # an average rate of events per second over 120s, default is 60s.
+ self.meter = runtime_context \
+ .get_metrics_group() \
+ .meter("my_meter", time_span_in_seconds=120)
+
+ def map(self, value: str):
+ self.meter.markEvent()
+ return value
+
```
{{< /tab >}}
{{< /tabs >}}
@@ -407,6 +476,11 @@ class MyMapper extends RichMapFunction[Long,Long] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
```
{{< /tab >}}
{{< /tabs >}}
@@ -454,6 +528,21 @@ counter = getRuntimeContext()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+counter = runtime_context \
+ .get_metric_group() \
+ .add_group("my_metrics") \
+ .counter("my_counter")
+
+counter = runtime_context \
+ .get_metric_group() \
+ .add_group("my_metrics_key", "my_metrics_value") \
+ .counter("my_counter")
+
```
{{< /tab >}}
{{< /tabs >}}
@@ -536,6 +625,14 @@ counter = getRuntimeContext()
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+counter = runtime_context
+ .get_metric_group() \
+ .add_group("my_metrics_key", "my_metrics_value") \
+ .counter("my_counter")
+```
+{{< /tab >}}
{{< /tabs >}}
## Reporter
diff --git a/docs/content/docs/connectors/datastream/jdbc.md b/docs/content/docs/connectors/datastream/jdbc.md
index 85c67235b0c..4610e393d8e 100644
--- a/docs/content/docs/connectors/datastream/jdbc.md
+++ b/docs/content/docs/connectors/datastream/jdbc.md
@@ -41,6 +41,8 @@ The JDBC sink provides at-least-once guarantee.
Effectively though, exactly-once can be achieved by crafting upsert SQL statements or idempotent SQL updates.
Configuration goes as follow (see also {{< javadoc file="org/apache/flink/connector/jdbc/JdbcSink.html" name="JdbcSink javadoc" >}}).
+{{< tabs "4ab65f13-607a-411a-8d24-e709f701cd4c" >}}
+{{< tab "Java" >}}
```java
JdbcSink.sink(
sqlDmlStatement, // mandatory
@@ -48,7 +50,19 @@ JdbcSink.sink(
jdbcExecutionOptions, // optional
jdbcConnectionOptions // mandatory
);
-```
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+JdbcSink.sink(
+ sql_dml_statement, # mandatory
+ type_info, # mandatory
+ jdbc_connection_options, # mandatory
+ jdbc_execution_options # optional
+)
+```
+{{< /tab >}}
+{{< /tabs >}}
### SQL DML statement and JDBC statement builder
@@ -68,6 +82,8 @@ It then repeatedly calls a user-provided function to update that prepared statem
The SQL DML statements are executed in batches, which can optionally be configured with the following instance (see also {{< javadoc name="JdbcExecutionOptions javadoc" file="org/apache/flink/connector/jdbc/JdbcExecutionOptions.html" >}})
+{{< tabs "4ab65f13-607a-411a-8d24-e709f512ed6k" >}}
+{{< tab "Java" >}}
```java
JdbcExecutionOptions.builder()
.withBatchIntervalMs(200) // optional: default = 0, meaning no time-based execution is done
@@ -75,6 +91,17 @@ JdbcExecutionOptions.builder()
.withMaxRetries(5) // optional: default = 3
.build();
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+JdbcExecutionOptions.builder() \
+ .with_batch_interval_ms(2000) \
+ .with_batch_size(100) \
+ .with_max_retries(5) \
+ .build()
+```
+{{< /tab >}}
+{{< /tabs >}}
A JDBC batch is executed as soon as one of the following conditions is true:
@@ -89,6 +116,8 @@ Please see {{< javadoc name="JdbcConnectionOptions javadoc" file="org/apache/fli
### Full example
+{{< tabs "4ab65f13-608a-411a-8d24-e303f348ds8d" >}}
+{{< tab "Java" >}}
```java
public class JdbcSinkExample {
@@ -139,6 +168,38 @@ public class JdbcSinkExample {
}
}
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+type_info = Types.ROW([Types.INT(), Types.STRING(), Types.STRING(), Types.INT()])
+env.from_collection(
+ [(101, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
+ (102, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
+ (103, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
+ (104, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
+ ], type_info=type_info) \
+ .add_sink(
+ JdbcSink.sink(
+ "insert into books (id, title, authors, year) values (?, ?, ?, ?)",
+ type_info,
+ JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+ .with_url('jdbc:postgresql://dbhost:5432/postgresdb')
+ .with_driver_name('org.postgresql.Driver')
+ .with_user_name('someUser')
+ .with_password('somePassword')
+ .build(),
+ JdbcExecutionOptions.builder()
+ .with_batch_interval_ms(1000)
+ .with_batch_size(200)
+ .with_max_retries(5)
+ .build()
+ ))
+
+env.execute()
+```
+{{< /tab >}}
+{{< /tabs >}}
## `JdbcSink.exactlyOnceSink`
@@ -153,6 +214,9 @@ To use it, create a sink using `exactlyOnceSink()` method as above and additiona
- [XA DataSource](https://docs.oracle.com/javase/8/docs/api/javax/sql/XADataSource.html) Supplier
For example:
+
+{{< tabs "4ab65f13-608a-411a-8d24-e304f627ac8f" >}}
+{{< tab "Java" >}}
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
@@ -179,13 +243,32 @@ env
});
env.execute();
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
+
**NOTE:** Some databases only allow a single XA transaction per connection (e.g. PostgreSQL, MySQL).
In such cases, please use the following API to construct `JdbcExactlyOnceOptions`:
+
+{{< tabs "4ab65f13-608a-411a-8d24-e304f627cd4e" >}}
+{{< tab "Java" >}}
```java
JdbcExactlyOnceOptions.builder()
.withTransactionPerConnection(true)
.build();
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
+
This will make Flink use a separate connection for every XA transaction. This may require adjusting connection limits.
For PostgreSQL and MySQL, this can be done by increasing `max_connections`.
@@ -198,28 +281,56 @@ with `JdbcExecutionOptions.maxRetries == 0`; otherwise, duplicated results maybe
### `XADataSource` examples
PostgreSQL `XADataSource` example:
+{{< tabs "4ab65f13-608a-411a-8d24-e304f323ab3a" >}}
+{{< tab "Java" >}}
```java
PGXADataSource xaDataSource = new org.postgresql.xa.PGXADataSource();
xaDataSource.setUrl("jdbc:postgresql://localhost:5432/postgres");
xaDataSource.setUser(username);
xaDataSource.setPassword(password);
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
MySQL `XADataSource` example:
+{{< tabs "4ab65f13-608a-411a-8d24-b213f323ca3c" >}}
+{{< tab "Java" >}}
```java
MysqlXADataSource xaDataSource = new com.mysql.cj.jdbc.MysqlXADataSource();
xaDataSource.setUrl("jdbc:mysql://localhost:3306/");
xaDataSource.setUser(username);
xaDataSource.setPassword(password);
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
Oracle `XADataSource` example:
+{{< tabs "4ab65f13-608a-411a-8d24-b213f337ad5f" >}}
+{{< tab "Java" >}}
```java
OracleXADataSource xaDataSource = new oracle.jdbc.xa.OracleXADataSource();
xaDataSource.setURL("jdbc:oracle:oci8:@");
xaDataSource.setUser("scott");
xaDataSource.setPassword("tiger");
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
+
Please also take Oracle connection pooling into account.
Please refer to the `JdbcXaSinkFunction` documentation for more details.
diff --git a/docs/content/docs/connectors/datastream/rabbitmq.md b/docs/content/docs/connectors/datastream/rabbitmq.md
index 543a8fe327b..3d8678aee2f 100644
--- a/docs/content/docs/connectors/datastream/rabbitmq.md
+++ b/docs/content/docs/connectors/datastream/rabbitmq.md
@@ -175,6 +175,14 @@ val connectionConfig = new RMQConnectionConfig.Builder()
.build
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+connection_config = RMQConnectionConfig.Builder() \
+ .set_prefetch_count(30000) \
+ ...
+ .build()
+```
+{{< /tab >}}
{{< /tabs >}}
The prefetch count is unset by default, meaning the RabbitMQ server will send unlimited messages. In production, it
@@ -221,6 +229,22 @@ stream.addSink(new RMQSink[String](
new SimpleStringSchema)) // serialization schema to turn Java objects to messages
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+stream = ...
+
+connection_config = RMQConnectionConfig.Builder() \
+ .set_host("localhost") \
+ .set_port(5000) \
+ ...
+ .build()
+
+stream.add_sink(RMQSink(
+ connection_config, # config for the RabbitMQ connection
+ 'queueName', # name of the RabbitMQ queue to send messages to
+ SimpleStringSchema())) # serialization schema to turn Java objects to messages
+```
+{{< /tab >}}
{{< /tabs >}}
More about RabbitMQ can be found [here](http://www.rabbitmq.com/).
diff --git a/docs/content/docs/dev/datastream/sources.md b/docs/content/docs/dev/datastream/sources.md
index 2a9265ab453..185425265bc 100644
--- a/docs/content/docs/dev/datastream/sources.md
+++ b/docs/content/docs/dev/datastream/sources.md
@@ -117,6 +117,8 @@ The `Source` implementation is expected to pass the `SplitEnumeratorContext` to
While a `SplitEnumerator` implementation can work well in a reactive way by only taking coordination actions when its method is invoked, some `SplitEnumerator` implementations might want to take actions actively. For example, a `SplitEnumerator` may want to periodically run split discovery and assign the new splits to the `SourceReaders`.
Such implementations may find that the `callAsync()` method `SplitEnumeratorContext` is handy. The code snippet below shows how the `SplitEnumerator` implementation can achieve that without maintaining its own threads.
+{{< tabs "066b6695-5bc3-4d7a-9033-ff6b1d15c3b1" >}}
+{{< tab "Java" >}}
```java
class MySplitEnumerator implements SplitEnumerator<MySplit> {
private final long DISCOVER_INTERVAL = 60_000L;
@@ -143,6 +145,13 @@ class MySplitEnumerator implements SplitEnumerator<MySplit> {
...
}
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
### SourceReader
@@ -194,6 +203,18 @@ val stream = env.fromSource(
...
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+
+my_source = ...
+
+env.from_source(
+ my_source,
+ WatermarkStrategy.no_watermarks(),
+ "my_source_name")
+```
+{{< /tab >}}
{{< /tabs >}}
----
@@ -240,6 +261,8 @@ As an example, as illustrated below, a `SplitFetcherManager` may have a fixed nu
The following code snippet implements this threading model.
+{{< tabs "bde5ff60-4e61-4633-a6dc-50524acb7b33" >}}
+{{< tab "Java" >}}
```java
/**
* A SplitFetcherManager that has a fixed size of split fetchers and assign splits
@@ -279,9 +302,18 @@ public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit>
}
}
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
And a `SourceReader` using this threading model can be created like following:
+{{< tabs "bde5ff60-4e61-4633-a6dc-50524aca6c31" >}}
+{{< tab "Java" >}}
```java
public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT>
extends SourceReaderBase<E, T, SplitT, SplitStateT> {
@@ -322,6 +354,13 @@ public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, Spli
}
}
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
The `SourceReader` implementations can also implement their own threading model easily on top of the `SplitFetcherManager` and `SourceReaderBase`.
@@ -337,12 +376,25 @@ Applications based on the legacy [SourceFunction](https://github.com/apache/flin
The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).
+{{< tabs "bde5ff60-4e61-4633-a6dc-50524acb7b36" >}}
+{{< tab "Java" >}}
```java
environment.fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName);
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+environment.from_source(
+ source: Source,
+ watermark_strategy: WatermarkStrategy,
+ source_name: str,
+ type_info: TypeInformation = None)
+```
+{{< /tab >}}
+{{< /tabs >}}
The `TimestampAssigner` and `WatermarkGenerator` run transparently as part of the `ReaderOutput`(or `SourceOutput`) so source implementors do not have to implement any timestamp extraction and watermark generation code.
diff --git a/docs/content/docs/dev/table/concepts/temporal_table_function.md b/docs/content/docs/dev/table/concepts/temporal_table_function.md
index ac6665d81c3..000ae122854 100644
--- a/docs/content/docs/dev/table/concepts/temporal_table_function.md
+++ b/docs/content/docs/dev/table/concepts/temporal_table_function.md
@@ -76,6 +76,11 @@ rates = tEnv
tEnv.registerFunction("rates", rates)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
{{< /tabs >}}
## Temporal Table Function Join
@@ -126,6 +131,11 @@ val result = orders
.select($"(o_amount * r_rate).sum as amount"))
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
{{< /tabs >}}
{{< top >}}
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 3a574e39995..9ef43ec1707 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -85,6 +85,23 @@ class MyMapper extends RichMapFunction[String,String] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+class MyMapper(MapFunction):
+ def __init__(self):
+ self.counter = None
+
+ def open(self, runtime_context: RuntimeContext):
+ self.counter = runtime_context \
+ .get_metrics_group() \
+ .counter("my_counter")
+
+ def map(self, value: str):
+ self.counter.inc()
+ return value
```
{{< /tab >}}
{{< /tabs >}}
@@ -133,6 +150,11 @@ class MyMapper extends RichMapFunction[String,String] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
```
{{< /tab >}}
{{< /tabs >}}
@@ -189,6 +211,24 @@ new class MyMapper extends RichMapFunction[String,String] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+class MyMapper(MapFunction):
+ def __init__(self):
+ self.value_to_expose = 0
+
+ def open(self, runtime_context: RuntimeContext):
+ runtime_context \
+ .get_metrics_group() \
+ .gauge("my_gauge", lambda: self.value_to_expose)
+
+ def map(self, value: str):
+ self.value_to_expose += 1
+ return value
+
```
{{< /tab >}}
{{< /tabs >}}
@@ -239,6 +279,11 @@ class MyMapper extends RichMapFunction[Long,Long] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
```
{{< /tab >}}
{{< /tabs >}}
@@ -300,6 +345,11 @@ class MyMapper extends RichMapFunction[Long, Long] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
```
{{< /tab >}}
{{< /tabs >}}
@@ -348,6 +398,25 @@ class MyMapper extends RichMapFunction[Long,Long] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+class MyMapperMeter(MapFunction):
+ def __init__(self):
+ self.meter = None
+
+ def open(self, runtime_context: RuntimeContext):
+ # an average rate of events per second over 120s, default is 60s.
+ self.meter = runtime_context
+ .get_metrics_group()
+ .meter("my_meter", time_span_in_seconds=120)
+
+ def map(self, value: str):
+ self.meter.markEvent()
+ return value
+
```
{{< /tab >}}
{{< /tabs >}}
@@ -407,6 +476,11 @@ class MyMapper extends RichMapFunction[Long,Long] {
}
}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
```
{{< /tab >}}
{{< /tabs >}}
@@ -454,6 +528,21 @@ counter = getRuntimeContext()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+counter = runtime_context \
+ .get_metric_group() \
+ .add_group("my_metrics") \
+ .counter("my_counter")
+
+counter = runtime_context \
+ .get_metric_group() \
+ .add_group("my_metrics_key", "my_metrics_value") \
+ .counter("my_counter")
+
```
{{< /tab >}}
{{< /tabs >}}