You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/07/01 05:37:17 UTC

[flink] branch release-1.15 updated: [FLINK-28140][python][docs] Improve the documentation by adding Python examples in DataStream API Integration page

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 898dc492760 [FLINK-28140][python][docs] Improve the documentation by adding Python examples in DataStream API Integration page
898dc492760 is described below

commit 898dc4927608fee4df801e7abff879fb1010ddc5
Author: Mingde Peng <pe...@gmail.com>
AuthorDate: Tue Jun 28 09:31:05 2022 +0800

    [FLINK-28140][python][docs] Improve the documentation by adding Python examples in DataStream API Integration page
    
    This closes #20121.
---
 docs/content.zh/docs/dev/table/data_stream_api.md | 300 ++++++++++++++++++++++
 docs/content/docs/dev/table/data_stream_api.md    | 300 ++++++++++++++++++++++
 2 files changed, 600 insertions(+)

diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md
index 396911e3048..f9c5f4dbbc4 100644
--- a/docs/content.zh/docs/dev/table/data_stream_api.md
+++ b/docs/content.zh/docs/dev/table/data_stream_api.md
@@ -405,6 +405,36 @@ val tableEnv = StreamTableEnvironment.create(env)
 // +U[Alice, 112]
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
+from pyflink.table import StreamTableEnvironment
+
+# setup DataStream API
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# set the batch runtime mode
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+
+# uncomment this for streaming mode
+# env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+
+# setup Table API
+# the table environment adopts the runtime mode during initialization
+table_env = StreamTableEnvironment.create(env)
+
+# define the same pipeline as above
+# prints in BATCH mode:
+# +I[Bob, 10]
+# +I[Alice, 112]
+
+# prints in STREAMING mode:
+# +I[Alice, 12]
+# +I[Bob, 10]
+# -U[Alice, 12]
+# +U[Alice, 112]
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 Once the changelog is applied to an external system (e.g. a key-value store), one can see that both
@@ -467,6 +497,15 @@ import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+# imports for Python DataStream API
+from pyflink.datastream import *
+
+# imports for Table API to Python DataStream API
+from pyflink.table import *
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 请查阅[配置]({{< ref "docs/dev/configuration/overview" >}})小节了解更多细节。
@@ -598,6 +637,8 @@ declare a final sink. Both `TableEnvironment` and also `StreamTableEnvironment`
 general `execute()` method. Instead, they offer methods for submitting a single source-to-sink
 pipeline or a statement set:
 
+{{< tabs "47a32814-abea-11eb-8529-0242ac133403" >}}
+{{< tab "Java" >}}
 ```java
 // execute with explicit sink
 tableEnv.from("InputTable").insertInto("OutputTable").execute();
@@ -620,6 +661,31 @@ tableEnv.from("InputTable").execute().print();
 
 tableEnv.executeSql("SELECT * FROM InputTable").print();
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+# execute with explicit sink
+table_env.from_path("input_table").execute_insert("output_table")
+
+table_env.execute_sql("INSERT INTO output_table SELECT * FROM input_table")
+
+table_env.create_statement_set() \
+    .add_insert("output_table", input_table) \
+    .add_insert("output_table2", input_table) \
+    .execute()
+
+table_env.create_statement_set() \
+    .add_insert_sql("INSERT INTO output_table SELECT * FROM input_table") \
+    .add_insert_sql("INSERT INTO output_table2 SELECT * FROM input_table") \
+    .execute()
+
+# execute with implicit local sink
+table_env.from_path("input_table").execute().print()
+
+table_env.execute_sql("SELECT * FROM input_table").print()
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 To combine both execution behaviors, every call to `StreamTableEnvironment.toDataStream`
 or `StreamTableEnvironment.toChangelogStream` will materialize (i.e. compile) the Table API sub-pipeline
@@ -627,6 +693,8 @@ and insert it into the DataStream API pipeline builder. This means that `StreamE
 or `DataStream.executeAndCollect` must be called afterwards. An execution in Table API will not trigger
 these "external parts".
 
+{{< tabs "47a32814-abea-11eb-8529-0242ac133504" >}}
+{{< tab "Java" >}}
 ```java
 // (1)
 
@@ -643,6 +711,25 @@ table.execute().print();
 // Flink job, (2) was already running before
 env.execute();
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+# (1)
+
+# adds a branch with a printing sink to the StreamExecutionEnvironment
+table_env.to_data_stream(table).print()
+
+# (2)
+# executes a Table API end-to-end pipeline as a Flink job and prints locally,
+# thus (1) has still not been executed
+table.execute().print()
+
+# executes the DataStream API pipeline with the sink defined in (1) as a
+# Flink job, (2) was already running before
+env.execute()
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 {{< top >}}
 
@@ -705,6 +792,24 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
 val tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+
+# adopt mode from StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+table_env = StreamTableEnvironment.create(env)
+
+# or
+
+# set mode explicitly for StreamTableEnvironment
+# it will be propagated to StreamExecutionEnvironment during planning
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env, EnvironmentSettings.in_batch_mode())
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 One must meet the following prerequisites before setting the runtime mode to `BATCH`:
@@ -808,6 +913,35 @@ tableEnv.toDataStream(table)
 // ...
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.table import TableDescriptor, Schema, DataTypes
+
+table = table_env.from_descriptor(
+    TableDescriptor.for_connector("datagen")
+        .option("number-of-rows", "10")
+        .schema(
+        Schema.new_builder()
+            .column("uid", DataTypes.TINYINT())
+            .column("payload", DataTypes.STRING())
+            .build())
+        .build())
+
+# convert the Table to a DataStream and further transform the pipeline
+collect = table_env.to_data_stream(table) \
+    .key_by(lambda r: r[0]) \
+    .map(lambda r: "My custom operator: " + r[1]) \
+    .execute_and_collect()
+
+for c in collect:
+    print(c)
+
+# prints:
+# My custom operator: 9660912d30a43c7b035e15bd...
+# My custom operator: 29f5f706d2144f4a4f9f52a0...
+# ...
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 ### Changelog Unification
@@ -824,6 +958,8 @@ in the resulting changelog stream. The example joins two tables in SQL (`UserTab
 an interval join based on the time attributes in both tables (`ts`). It uses DataStream API to implement
 a custom operator that deduplicates the user name using a `KeyedProcessFunction` and value state.
 
+{{< tabs "3f5f5d4e-cd03-48d1-9309-917a6cf66aba" >}}
+{{< tab "Java" >}}
 ```java
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.state.ValueState;
@@ -940,6 +1076,107 @@ env.execute();
 // Bob
 // Alice
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from datetime import datetime
+from pyflink.common import Row, Types
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode,
+    KeyedProcessFunction, RuntimeContext
+from pyflink.datastream.state import ValueStateDescriptor
+from pyflink.table import StreamTableEnvironment, Schema, DataTypes
+
+# setup DataStream API
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# use BATCH or STREAMING mode
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+
+# setup Table API
+table_env = StreamTableEnvironment.create(env)
+
+# create a user stream
+t_format = "%Y-%m-%dT%H:%M:%S"
+user_stream = env.from_collection(
+    [Row(datetime.strptime("2021-08-21T13:00:00", t_format), 1, "Alice"),
+     Row(datetime.strptime("2021-08-21T13:05:00", t_format), 2, "Bob"),
+     Row(datetime.strptime("2021-08-21T13:10:00", t_format), 2, "Bob")],
+    type_info=Types.ROW_NAMED(["ts1", "uid", "name"],
+                              [Types.SQL_TIMESTAMP(), Types.INT(), Types.STRING()]))
+
+# create an order stream
+order_stream = env.from_collection(
+    [Row(datetime.strptime("2021-08-21T13:02:00", t_format), 1, 122),
+     Row(datetime.strptime("2021-08-21T13:07:00", t_format), 2, 239),
+     Row(datetime.strptime("2021-08-21T13:11:00", t_format), 2, 999)],
+    type_info=Types.ROW_NAMED(["ts1", "uid", "amount"],
+                        [Types.SQL_TIMESTAMP(), Types.INT(), Types.INT()]))
+
+# # create corresponding tables
+table_env.create_temporary_view(
+    "user_table",
+    user_stream,
+    Schema.new_builder()
+        .column_by_expression("ts", "CAST(ts1 AS TIMESTAMP(3))")
+        .column("uid", DataTypes.INT())
+        .column("name", DataTypes.STRING())
+        .watermark("ts", "ts - INTERVAL '1' SECOND")
+        .build())
+
+table_env.create_temporary_view(
+    "order_table",
+    order_stream,
+    Schema.new_builder()
+        .column_by_expression("ts", "CAST(ts1 AS TIMESTAMP(3))")
+        .column("uid", DataTypes.INT())
+        .column("amount", DataTypes.INT())
+        .watermark("ts", "ts - INTERVAL '1' SECOND")
+        .build())
+
+# perform interval join
+joined_table = table_env.sql_query(
+    "SELECT U.name, O.amount " +
+    "FROM user_table U, order_table O " +
+    "WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES")
+
+joined_stream = table_env.to_data_stream(joined_table)
+
+joined_stream.print()
+
+# implement a custom operator using ProcessFunction and value state
+class MyProcessFunction(KeyedProcessFunction):
+
+    def __init__(self):
+        self.seen = None
+
+    def open(self, runtime_context: RuntimeContext):
+        state_descriptor = ValueStateDescriptor("seen", Types.STRING())
+        self.seen = runtime_context.get_state(state_descriptor)
+
+    def process_element(self, value, ctx):
+        name = value[0]
+        if self.seen.value() is None:
+            self.seen.update(name)
+            yield name
+
+joined_stream \
+    .key_by(lambda r: r[0]) \
+    .process(MyProcessFunction()) \
+    .print()
+
+# execute unified pipeline
+env.execute()
+
+# prints (in both BATCH and STREAMING mode):
+# +I[Bob, 239]
+# +I[Alice, 122]
+# +I[Bob, 999]
+#
+# Bob
+# Alice
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 {{< top >}}
 
@@ -1422,6 +1659,9 @@ data structures. The following example in Java shows what is possible. Check als
 [Data Types & Serialization]({{< ref "docs/dev/datastream/fault-tolerance/serialization/types_serialization" >}}) page of
 the DataStream API for more information about the supported types there.
 
+
+{{< tabs "079cdf25-21ef-4393-ad69-623510038b1b" >}}
+{{< tab "Java" >}}
 ```java
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.DataTypes;
@@ -1494,6 +1734,13 @@ table.printSchema();
 //  `user` *User<`name` STRING,`score` INT>*
 // )
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Custom PoJo Class is unsupported in PyFlink now.
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 ### Examples for `createTemporaryView`
 
@@ -2641,6 +2888,59 @@ env.execute()
 // +I[3]
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.common import Encoder
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import StreamingFileSink
+from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
+    
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+statement_set = table_env.create_statement_set()
+
+# create some source
+source_descriptor = TableDescriptor.for_connector("datagen") \
+    .option("number-of-rows", "3") \
+    .schema(
+    Schema.new_builder()
+        .column("my_col", DataTypes.INT())
+        .column("my_other_col", DataTypes.BOOLEAN())
+        .build()) \
+    .build()
+
+# create some sink
+sink_descriptor = TableDescriptor.for_connector("print").build()
+
+# add a pure Table API pipeline
+table_from_source = table_env.from_descriptor(source_descriptor)
+statement_set.add_insert(sink_descriptor, table_from_source)
+
+
+# use table sinks for the DataStream API pipeline
+data_stream = env.from_collection([1, 2, 3])
+table_from_stream = table_env.from_data_stream(data_stream)
+statement_set.add_insert(sink_descriptor, table_from_stream)
+
+# define other DataStream API parts
+env.from_collection([4, 5, 6])
+    .add_sink(StreamingFileSink
+              .for_row_format('/tmp/output', Encoder.simple_string_encoder())
+              .build())
+
+# use DataStream API to submit the pipelines
+env.execute()
+
+# prints similar to:
+# +I[1618440447, false]
+# +I[1259693645, true]
+# +I[158588930, false]
+# +I[1]
+# +I[2]
+# +I[3]
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 {{< top >}}
diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md
index 6dd85151de2..839f1a516a4 100644
--- a/docs/content/docs/dev/table/data_stream_api.md
+++ b/docs/content/docs/dev/table/data_stream_api.md
@@ -403,6 +403,36 @@ val tableEnv = StreamTableEnvironment.create(env)
 // +U[Alice, 112]
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
+from pyflink.table import StreamTableEnvironment
+
+# setup DataStream API
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# set the batch runtime mode
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+
+# uncomment this for streaming mode
+# env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+
+# setup Table API
+# the table environment adopts the runtime mode during initialization
+table_env = StreamTableEnvironment.create(env)
+
+# define the same pipeline as above
+# prints in BATCH mode:
+# +I[Bob, 10]
+# +I[Alice, 112]
+
+# prints in STREAMING mode:
+# +I[Alice, 12]
+# +I[Bob, 10]
+# -U[Alice, 12]
+# +U[Alice, 112]
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 Once the changelog is applied to an external system (e.g. a key-value store), one can see that both
@@ -465,6 +495,15 @@ import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+# imports for Python DataStream API
+from pyflink.datastream import *
+
+# imports for Table API to Python DataStream API
+from pyflink.table import *
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 Please refer to the [configuration]({{< ref "docs/dev/configuration/overview" >}}) section for more information. 
@@ -596,6 +635,8 @@ declare a final sink. Both `TableEnvironment` and also `StreamTableEnvironment`
 general `execute()` method. Instead, they offer methods for submitting a single source-to-sink
 pipeline or a statement set:
 
+{{< tabs "47a32814-abea-11eb-8529-0242ac133404" >}}
+{{< tab "Java" >}}
 ```java
 // execute with explicit sink
 tableEnv.from("InputTable").insertInto("OutputTable").execute();
@@ -618,6 +659,31 @@ tableEnv.from("InputTable").execute().print();
 
 tableEnv.executeSql("SELECT * FROM InputTable").print();
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+# execute with explicit sink
+table_env.from_path("input_table").execute_insert("output_table")
+
+table_env.execute_sql("INSERT INTO output_table SELECT * FROM input_table")
+
+table_env.create_statement_set() \
+    .add_insert("output_table", input_table) \
+    .add_insert("output_table2", input_table) \
+    .execute()
+
+table_env.create_statement_set() \
+    .add_insert_sql("INSERT INTO output_table SELECT * FROM input_table") \
+    .add_insert_sql("INSERT INTO output_table2 SELECT * FROM input_table") \
+    .execute()
+
+# execute with implicit local sink
+table_env.from_path("input_table").execute().print()
+
+table_env.execute_sql("SELECT * FROM input_table").print()
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 To combine both execution behaviors, every call to `StreamTableEnvironment.toDataStream`
 or `StreamTableEnvironment.toChangelogStream` will materialize (i.e. compile) the Table API sub-pipeline
@@ -625,6 +691,8 @@ and insert it into the DataStream API pipeline builder. This means that `StreamE
 or `DataStream.executeAndCollect` must be called afterwards. An execution in Table API will not trigger
 these "external parts".
 
+{{< tabs "47a32814-abea-11eb-8529-0242ac133505" >}}
+{{< tab "Java" >}}
 ```java
 // (1)
 
@@ -641,6 +709,25 @@ table.execute().print();
 // Flink job, (2) was already running before
 env.execute();
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+# (1)
+
+# adds a branch with a printing sink to the StreamExecutionEnvironment
+table_env.to_data_stream(table).print()
+
+# (2)
+# executes a Table API end-to-end pipeline as a Flink job and prints locally,
+# thus (1) has still not been executed
+table.execute().print()
+
+# executes the DataStream API pipeline with the sink defined in (1) as a
+# Flink job, (2) was already running before
+env.execute()
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 {{< top >}}
 
@@ -703,6 +790,24 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
 val tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+
+# adopt mode from StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+table_env = StreamTableEnvironment.create(env)
+
+# or
+
+# set mode explicitly for StreamTableEnvironment
+# it will be propagated to StreamExecutionEnvironment during planning
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env, EnvironmentSettings.in_batch_mode())
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 One must meet the following prerequisites before setting the runtime mode to `BATCH`:
@@ -806,6 +911,35 @@ tableEnv.toDataStream(table)
 // ...
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.table import TableDescriptor, Schema, DataTypes
+
+table = table_env.from_descriptor(
+    TableDescriptor.for_connector("datagen")
+        .option("number-of-rows", "10")
+        .schema(
+        Schema.new_builder()
+            .column("uid", DataTypes.TINYINT())
+            .column("payload", DataTypes.STRING())
+            .build())
+        .build())
+
+# convert the Table to a DataStream and further transform the pipeline
+collect = table_env.to_data_stream(table) \
+    .key_by(lambda r: r[0]) \
+    .map(lambda r: "My custom operator: " + r[1]) \
+    .execute_and_collect()
+
+for c in collect:
+    print(c)
+
+# prints:
+# My custom operator: 9660912d30a43c7b035e15bd...
+# My custom operator: 29f5f706d2144f4a4f9f52a0...
+# ...
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 ### Changelog Unification
@@ -822,6 +956,8 @@ in the resulting changelog stream. The example joins two tables in SQL (`UserTab
 an interval join based on the time attributes in both tables (`ts`). It uses DataStream API to implement
 a custom operator that deduplicates the user name using a `KeyedProcessFunction` and value state.
 
+{{< tabs "3f5f5d4e-cd03-48d1-9309-917a6cf66aba" >}}
+{{< tab "Java" >}}
 ```java
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.state.ValueState;
@@ -938,6 +1074,107 @@ env.execute();
 // Bob
 // Alice
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from datetime import datetime
+from pyflink.common import Row, Types
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode,
+    KeyedProcessFunction, RuntimeContext
+from pyflink.datastream.state import ValueStateDescriptor
+from pyflink.table import StreamTableEnvironment, Schema, DataTypes
+
+# setup DataStream API
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# use BATCH or STREAMING mode
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+
+# setup Table API
+table_env = StreamTableEnvironment.create(env)
+
+# create a user stream
+t_format = "%Y-%m-%dT%H:%M:%S"
+user_stream = env.from_collection(
+    [Row(datetime.strptime("2021-08-21T13:00:00", t_format), 1, "Alice"),
+     Row(datetime.strptime("2021-08-21T13:05:00", t_format), 2, "Bob"),
+     Row(datetime.strptime("2021-08-21T13:10:00", t_format), 2, "Bob")],
+    type_info=Types.ROW_NAMED(["ts1", "uid", "name"],
+                              [Types.SQL_TIMESTAMP(), Types.INT(), Types.STRING()]))
+
+# create an order stream
+order_stream = env.from_collection(
+    [Row(datetime.strptime("2021-08-21T13:02:00", t_format), 1, 122),
+     Row(datetime.strptime("2021-08-21T13:07:00", t_format), 2, 239),
+     Row(datetime.strptime("2021-08-21T13:11:00", t_format), 2, 999)],
+    type_info=Types.ROW_NAMED(["ts1", "uid", "amount"],
+                        [Types.SQL_TIMESTAMP(), Types.INT(), Types.INT()]))
+
+# # create corresponding tables
+table_env.create_temporary_view(
+    "user_table",
+    user_stream,
+    Schema.new_builder()
+        .column_by_expression("ts", "CAST(ts1 AS TIMESTAMP(3))")
+        .column("uid", DataTypes.INT())
+        .column("name", DataTypes.STRING())
+        .watermark("ts", "ts - INTERVAL '1' SECOND")
+        .build())
+
+table_env.create_temporary_view(
+    "order_table",
+    order_stream,
+    Schema.new_builder()
+        .column_by_expression("ts", "CAST(ts1 AS TIMESTAMP(3))")
+        .column("uid", DataTypes.INT())
+        .column("amount", DataTypes.INT())
+        .watermark("ts", "ts - INTERVAL '1' SECOND")
+        .build())
+
+# perform interval join
+joined_table = table_env.sql_query(
+    "SELECT U.name, O.amount " +
+    "FROM user_table U, order_table O " +
+    "WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES")
+
+joined_stream = table_env.to_data_stream(joined_table)
+
+joined_stream.print()
+
+# implement a custom operator using ProcessFunction and value state
+class MyProcessFunction(KeyedProcessFunction):
+
+    def __init__(self):
+        self.seen = None
+
+    def open(self, runtime_context: RuntimeContext):
+        state_descriptor = ValueStateDescriptor("seen", Types.STRING())
+        self.seen = runtime_context.get_state(state_descriptor)
+
+    def process_element(self, value, ctx):
+        name = value[0]
+        if self.seen.value() is None:
+            self.seen.update(name)
+            yield name
+
+joined_stream \
+    .key_by(lambda r: r[0]) \
+    .process(MyProcessFunction()) \
+    .print()
+
+# execute unified pipeline
+env.execute()
+
+# prints (in both BATCH and STREAMING mode):
+# +I[Bob, 239]
+# +I[Alice, 122]
+# +I[Bob, 999]
+#
+# Bob
+# Alice
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 {{< top >}}
 
@@ -1420,6 +1657,9 @@ data structures. The following example in Java shows what is possible. Check als
 [Data Types & Serialization]({{< ref "docs/dev/datastream/fault-tolerance/serialization/types_serialization" >}}) page of
 the DataStream API for more information about the supported types there.
 
+
+{{< tabs "079cdf25-21ef-4393-ad69-623510038b3b" >}}
+{{< tab "Java" >}}
 ```java
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.DataTypes;
@@ -1492,6 +1732,13 @@ table.printSchema();
 //  `user` *User<`name` STRING,`score` INT>*
 // )
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Custom PoJo Class is unsupported in PyFlink now.
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 ### Examples for `createTemporaryView`
 
@@ -2639,6 +2886,59 @@ env.execute()
 // +I[3]
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.common import Encoder
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import StreamingFileSink
+from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
+    
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+statement_set = table_env.create_statement_set()
+
+# create some source
+source_descriptor = TableDescriptor.for_connector("datagen") \
+    .option("number-of-rows", "3") \
+    .schema(
+    Schema.new_builder()
+        .column("my_col", DataTypes.INT())
+        .column("my_other_col", DataTypes.BOOLEAN())
+        .build()) \
+    .build()
+
+# create some sink
+sink_descriptor = TableDescriptor.for_connector("print").build()
+
+# add a pure Table API pipeline
+table_from_source = table_env.from_descriptor(source_descriptor)
+statement_set.add_insert(sink_descriptor, table_from_source)
+
+
+# use table sinks for the DataStream API pipeline
+data_stream = env.from_collection([1, 2, 3])
+table_from_stream = table_env.from_data_stream(data_stream)
+statement_set.add_insert(sink_descriptor, table_from_stream)
+
+# define other DataStream API parts
+env.from_collection([4, 5, 6])
+    .add_sink(StreamingFileSink
+              .for_row_format('/tmp/output', Encoder.simple_string_encoder())
+              .build())
+
+# use DataStream API to submit the pipelines
+env.execute()
+
+# prints similar to:
+# +I[1618440447, false]
+# +I[1259693645, true]
+# +I[158588930, false]
+# +I[1]
+# +I[2]
+# +I[3]
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 {{< top >}}