You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/07/01 05:37:17 UTC
[flink] branch release-1.15 updated: [FLINK-28140][python][docs] Improve the documentation by adding Python examples in DataStream API Integration page
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 898dc492760 [FLINK-28140][python][docs] Improve the documentation by adding Python examples in DataStream API Integration page
898dc492760 is described below
commit 898dc4927608fee4df801e7abff879fb1010ddc5
Author: Mingde Peng <pe...@gmail.com>
AuthorDate: Tue Jun 28 09:31:05 2022 +0800
[FLINK-28140][python][docs] Improve the documentation by adding Python examples in DataStream API Integration page
This closes #20121.
---
docs/content.zh/docs/dev/table/data_stream_api.md | 300 ++++++++++++++++++++++
docs/content/docs/dev/table/data_stream_api.md | 300 ++++++++++++++++++++++
2 files changed, 600 insertions(+)
diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md
index 396911e3048..f9c5f4dbbc4 100644
--- a/docs/content.zh/docs/dev/table/data_stream_api.md
+++ b/docs/content.zh/docs/dev/table/data_stream_api.md
@@ -405,6 +405,36 @@ val tableEnv = StreamTableEnvironment.create(env)
// +U[Alice, 112]
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
+from pyflink.table import StreamTableEnvironment
+
+# setup DataStream API
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# set the batch runtime mode
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+
+# uncomment this for streaming mode
+# env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+
+# setup Table API
+# the table environment adopts the runtime mode during initialization
+table_env = StreamTableEnvironment.create(env)
+
+# define the same pipeline as above
+# prints in BATCH mode:
+# +I[Bob, 10]
+# +I[Alice, 112]
+
+# prints in STREAMING mode:
+# +I[Alice, 12]
+# +I[Bob, 10]
+# -U[Alice, 12]
+# +U[Alice, 112]
+```
+{{< /tab >}}
{{< /tabs >}}
Once the changelog is applied to an external system (e.g. a key-value store), one can see that both
@@ -467,6 +497,15 @@ import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+# imports for Python DataStream API
+from pyflink.datastream import *
+
+# imports for Table API to Python DataStream API
+from pyflink.table import *
+```
+{{< /tab >}}
{{< /tabs >}}
请查阅[配置]({{< ref "docs/dev/configuration/overview" >}})小节了解更多细节。
@@ -598,6 +637,8 @@ declare a final sink. Both `TableEnvironment` and also `StreamTableEnvironment`
general `execute()` method. Instead, they offer methods for submitting a single source-to-sink
pipeline or a statement set:
+{{< tabs "47a32814-abea-11eb-8529-0242ac133403" >}}
+{{< tab "Java" >}}
```java
// execute with explicit sink
tableEnv.from("InputTable").insertInto("OutputTable").execute();
@@ -620,6 +661,31 @@ tableEnv.from("InputTable").execute().print();
tableEnv.executeSql("SELECT * FROM InputTable").print();
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+# execute with explicit sink
+table_env.from_path("input_table").execute_insert("output_table")
+
+table_env.execute_sql("INSERT INTO output_table SELECT * FROM input_table")
+
+table_env.create_statement_set() \
+ .add_insert("output_table", input_table) \
+ .add_insert("output_table2", input_table) \
+ .execute()
+
+table_env.create_statement_set() \
+ .add_insert_sql("INSERT INTO output_table SELECT * FROM input_table") \
+ .add_insert_sql("INSERT INTO output_table2 SELECT * FROM input_table") \
+ .execute()
+
+# execute with implicit local sink
+table_env.from_path("input_table").execute().print()
+
+table_env.execute_sql("SELECT * FROM input_table").print()
+```
+{{< /tab >}}
+{{< /tabs >}}
To combine both execution behaviors, every call to `StreamTableEnvironment.toDataStream`
or `StreamTableEnvironment.toChangelogStream` will materialize (i.e. compile) the Table API sub-pipeline
@@ -627,6 +693,8 @@ and insert it into the DataStream API pipeline builder. This means that `StreamE
or `DataStream.executeAndCollect` must be called afterwards. An execution in Table API will not trigger
these "external parts".
+{{< tabs "47a32814-abea-11eb-8529-0242ac133504" >}}
+{{< tab "Java" >}}
```java
// (1)
@@ -643,6 +711,25 @@ table.execute().print();
// Flink job, (2) was already running before
env.execute();
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+# (1)
+
+# adds a branch with a printing sink to the StreamExecutionEnvironment
+table_env.to_data_stream(table).print()
+
+# (2)
+# executes a Table API end-to-end pipeline as a Flink job and prints locally,
+# thus (1) has still not been executed
+table.execute().print()
+
+# executes the DataStream API pipeline with the sink defined in (1) as a
+# Flink job, (2) was already running before
+env.execute()
+```
+{{< /tab >}}
+{{< /tabs >}}
{{< top >}}
@@ -705,6 +792,24 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+
+# adopt mode from StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+table_env = StreamTableEnvironment.create(env)
+
+# or
+
+# set mode explicitly for StreamTableEnvironment
+# it will be propagated to StreamExecutionEnvironment during planning
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env, EnvironmentSettings.in_batch_mode())
+```
+{{< /tab >}}
{{< /tabs >}}
One must meet the following prerequisites before setting the runtime mode to `BATCH`:
@@ -808,6 +913,35 @@ tableEnv.toDataStream(table)
// ...
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.table import TableDescriptor, Schema, DataTypes
+
+table = table_env.from_descriptor(
+ TableDescriptor.for_connector("datagen")
+ .option("number-of-rows", "10")
+ .schema(
+ Schema.new_builder()
+ .column("uid", DataTypes.TINYINT())
+ .column("payload", DataTypes.STRING())
+ .build())
+ .build())
+
+# convert the Table to a DataStream and further transform the pipeline
+collect = table_env.to_data_stream(table) \
+ .key_by(lambda r: r[0]) \
+ .map(lambda r: "My custom operator: " + r[1]) \
+ .execute_and_collect()
+
+for c in collect:
+ print(c)
+
+# prints:
+# My custom operator: 9660912d30a43c7b035e15bd...
+# My custom operator: 29f5f706d2144f4a4f9f52a0...
+# ...
+```
+{{< /tab >}}
{{< /tabs >}}
### Changelog Unification
@@ -824,6 +958,8 @@ in the resulting changelog stream. The example joins two tables in SQL (`UserTab
an interval join based on the time attributes in both tables (`ts`). It uses DataStream API to implement
a custom operator that deduplicates the user name using a `KeyedProcessFunction` and value state.
+{{< tabs "3f5f5d4e-cd03-48d1-9309-917a6cf66aba" >}}
+{{< tab "Java" >}}
```java
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.ValueState;
@@ -940,6 +1076,107 @@ env.execute();
// Bob
// Alice
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from datetime import datetime
+from pyflink.common import Row, Types
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode,
+ KeyedProcessFunction, RuntimeContext
+from pyflink.datastream.state import ValueStateDescriptor
+from pyflink.table import StreamTableEnvironment, Schema, DataTypes
+
+# setup DataStream API
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# use BATCH or STREAMING mode
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+
+# setup Table API
+table_env = StreamTableEnvironment.create(env)
+
+# create a user stream
+t_format = "%Y-%m-%dT%H:%M:%S"
+user_stream = env.from_collection(
+ [Row(datetime.strptime("2021-08-21T13:00:00", t_format), 1, "Alice"),
+ Row(datetime.strptime("2021-08-21T13:05:00", t_format), 2, "Bob"),
+ Row(datetime.strptime("2021-08-21T13:10:00", t_format), 2, "Bob")],
+ type_info=Types.ROW_NAMED(["ts1", "uid", "name"],
+ [Types.SQL_TIMESTAMP(), Types.INT(), Types.STRING()]))
+
+# create an order stream
+order_stream = env.from_collection(
+ [Row(datetime.strptime("2021-08-21T13:02:00", t_format), 1, 122),
+ Row(datetime.strptime("2021-08-21T13:07:00", t_format), 2, 239),
+ Row(datetime.strptime("2021-08-21T13:11:00", t_format), 2, 999)],
+ type_info=Types.ROW_NAMED(["ts1", "uid", "amount"],
+ [Types.SQL_TIMESTAMP(), Types.INT(), Types.INT()]))
+
+# # create corresponding tables
+table_env.create_temporary_view(
+ "user_table",
+ user_stream,
+ Schema.new_builder()
+ .column_by_expression("ts", "CAST(ts1 AS TIMESTAMP(3))")
+ .column("uid", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .watermark("ts", "ts - INTERVAL '1' SECOND")
+ .build())
+
+table_env.create_temporary_view(
+ "order_table",
+ order_stream,
+ Schema.new_builder()
+ .column_by_expression("ts", "CAST(ts1 AS TIMESTAMP(3))")
+ .column("uid", DataTypes.INT())
+ .column("amount", DataTypes.INT())
+ .watermark("ts", "ts - INTERVAL '1' SECOND")
+ .build())
+
+# perform interval join
+joined_table = table_env.sql_query(
+ "SELECT U.name, O.amount " +
+ "FROM user_table U, order_table O " +
+ "WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES")
+
+joined_stream = table_env.to_data_stream(joined_table)
+
+joined_stream.print()
+
+# implement a custom operator using ProcessFunction and value state
+class MyProcessFunction(KeyedProcessFunction):
+
+ def __init__(self):
+ self.seen = None
+
+ def open(self, runtime_context: RuntimeContext):
+ state_descriptor = ValueStateDescriptor("seen", Types.STRING())
+ self.seen = runtime_context.get_state(state_descriptor)
+
+ def process_element(self, value, ctx):
+ name = value[0]
+ if self.seen.value() is None:
+ self.seen.update(name)
+ yield name
+
+joined_stream \
+ .key_by(lambda r: r[0]) \
+ .process(MyProcessFunction()) \
+ .print()
+
+# execute unified pipeline
+env.execute()
+
+# prints (in both BATCH and STREAMING mode):
+# +I[Bob, 239]
+# +I[Alice, 122]
+# +I[Bob, 999]
+#
+# Bob
+# Alice
+```
+{{< /tab >}}
+{{< /tabs >}}
{{< top >}}
@@ -1422,6 +1659,9 @@ data structures. The following example in Java shows what is possible. Check als
[Data Types & Serialization]({{< ref "docs/dev/datastream/fault-tolerance/serialization/types_serialization" >}}) page of
the DataStream API for more information about the supported types there.
+
+{{< tabs "079cdf25-21ef-4393-ad69-623510038b1b" >}}
+{{< tab "Java" >}}
```java
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
@@ -1494,6 +1734,13 @@ table.printSchema();
// `user` *User<`name` STRING,`score` INT>*
// )
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Custom PoJo Class is unsupported in PyFlink now.
+```
+{{< /tab >}}
+{{< /tabs >}}
### Examples for `createTemporaryView`
@@ -2641,6 +2888,59 @@ env.execute()
// +I[3]
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.common import Encoder
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import StreamingFileSink
+from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+statement_set = table_env.create_statement_set()
+
+# create some source
+source_descriptor = TableDescriptor.for_connector("datagen") \
+ .option("number-of-rows", "3") \
+ .schema(
+ Schema.new_builder()
+ .column("my_col", DataTypes.INT())
+ .column("my_other_col", DataTypes.BOOLEAN())
+ .build()) \
+ .build()
+
+# create some sink
+sink_descriptor = TableDescriptor.for_connector("print").build()
+
+# add a pure Table API pipeline
+table_from_source = table_env.from_descriptor(source_descriptor)
+statement_set.add_insert(sink_descriptor, table_from_source)
+
+
+# use table sinks for the DataStream API pipeline
+data_stream = env.from_collection([1, 2, 3])
+table_from_stream = table_env.from_data_stream(data_stream)
+statement_set.add_insert(sink_descriptor, table_from_stream)
+
+# define other DataStream API parts
+env.from_collection([4, 5, 6])
+ .add_sink(StreamingFileSink
+ .for_row_format('/tmp/output', Encoder.simple_string_encoder())
+ .build())
+
+# use DataStream API to submit the pipelines
+env.execute()
+
+# prints similar to:
+# +I[1618440447, false]
+# +I[1259693645, true]
+# +I[158588930, false]
+# +I[1]
+# +I[2]
+# +I[3]
+```
+{{< /tab >}}
{{< /tabs >}}
{{< top >}}
diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md
index 6dd85151de2..839f1a516a4 100644
--- a/docs/content/docs/dev/table/data_stream_api.md
+++ b/docs/content/docs/dev/table/data_stream_api.md
@@ -403,6 +403,36 @@ val tableEnv = StreamTableEnvironment.create(env)
// +U[Alice, 112]
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
+from pyflink.table import StreamTableEnvironment
+
+# setup DataStream API
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# set the batch runtime mode
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+
+# uncomment this for streaming mode
+# env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+
+# setup Table API
+# the table environment adopts the runtime mode during initialization
+table_env = StreamTableEnvironment.create(env)
+
+# define the same pipeline as above
+# prints in BATCH mode:
+# +I[Bob, 10]
+# +I[Alice, 112]
+
+# prints in STREAMING mode:
+# +I[Alice, 12]
+# +I[Bob, 10]
+# -U[Alice, 12]
+# +U[Alice, 112]
+```
+{{< /tab >}}
{{< /tabs >}}
Once the changelog is applied to an external system (e.g. a key-value store), one can see that both
@@ -465,6 +495,15 @@ import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+# imports for Python DataStream API
+from pyflink.datastream import *
+
+# imports for Table API to Python DataStream API
+from pyflink.table import *
+```
+{{< /tab >}}
{{< /tabs >}}
Please refer to the [configuration]({{< ref "docs/dev/configuration/overview" >}}) section for more information.
@@ -596,6 +635,8 @@ declare a final sink. Both `TableEnvironment` and also `StreamTableEnvironment`
general `execute()` method. Instead, they offer methods for submitting a single source-to-sink
pipeline or a statement set:
+{{< tabs "47a32814-abea-11eb-8529-0242ac133404" >}}
+{{< tab "Java" >}}
```java
// execute with explicit sink
tableEnv.from("InputTable").insertInto("OutputTable").execute();
@@ -618,6 +659,31 @@ tableEnv.from("InputTable").execute().print();
tableEnv.executeSql("SELECT * FROM InputTable").print();
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+# execute with explicit sink
+table_env.from_path("input_table").execute_insert("output_table")
+
+table_env.execute_sql("INSERT INTO output_table SELECT * FROM input_table")
+
+table_env.create_statement_set() \
+ .add_insert("output_table", input_table) \
+ .add_insert("output_table2", input_table) \
+ .execute()
+
+table_env.create_statement_set() \
+ .add_insert_sql("INSERT INTO output_table SELECT * FROM input_table") \
+ .add_insert_sql("INSERT INTO output_table2 SELECT * FROM input_table") \
+ .execute()
+
+# execute with implicit local sink
+table_env.from_path("input_table").execute().print()
+
+table_env.execute_sql("SELECT * FROM input_table").print()
+```
+{{< /tab >}}
+{{< /tabs >}}
To combine both execution behaviors, every call to `StreamTableEnvironment.toDataStream`
or `StreamTableEnvironment.toChangelogStream` will materialize (i.e. compile) the Table API sub-pipeline
@@ -625,6 +691,8 @@ and insert it into the DataStream API pipeline builder. This means that `StreamE
or `DataStream.executeAndCollect` must be called afterwards. An execution in Table API will not trigger
these "external parts".
+{{< tabs "47a32814-abea-11eb-8529-0242ac133505" >}}
+{{< tab "Java" >}}
```java
// (1)
@@ -641,6 +709,25 @@ table.execute().print();
// Flink job, (2) was already running before
env.execute();
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+# (1)
+
+# adds a branch with a printing sink to the StreamExecutionEnvironment
+table_env.to_data_stream(table).print()
+
+# (2)
+# executes a Table API end-to-end pipeline as a Flink job and prints locally,
+# thus (1) has still not been executed
+table.execute().print()
+
+# executes the DataStream API pipeline with the sink defined in (1) as a
+# Flink job, (2) was already running before
+env.execute()
+```
+{{< /tab >}}
+{{< /tabs >}}
{{< top >}}
@@ -703,6 +790,24 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+
+# adopt mode from StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+table_env = StreamTableEnvironment.create(env)
+
+# or
+
+# set mode explicitly for StreamTableEnvironment
+# it will be propagated to StreamExecutionEnvironment during planning
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env, EnvironmentSettings.in_batch_mode())
+```
+{{< /tab >}}
{{< /tabs >}}
One must meet the following prerequisites before setting the runtime mode to `BATCH`:
@@ -806,6 +911,35 @@ tableEnv.toDataStream(table)
// ...
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.table import TableDescriptor, Schema, DataTypes
+
+table = table_env.from_descriptor(
+ TableDescriptor.for_connector("datagen")
+ .option("number-of-rows", "10")
+ .schema(
+ Schema.new_builder()
+ .column("uid", DataTypes.TINYINT())
+ .column("payload", DataTypes.STRING())
+ .build())
+ .build())
+
+# convert the Table to a DataStream and further transform the pipeline
+collect = table_env.to_data_stream(table) \
+ .key_by(lambda r: r[0]) \
+ .map(lambda r: "My custom operator: " + r[1]) \
+ .execute_and_collect()
+
+for c in collect:
+ print(c)
+
+# prints:
+# My custom operator: 9660912d30a43c7b035e15bd...
+# My custom operator: 29f5f706d2144f4a4f9f52a0...
+# ...
+```
+{{< /tab >}}
{{< /tabs >}}
### Changelog Unification
@@ -822,6 +956,8 @@ in the resulting changelog stream. The example joins two tables in SQL (`UserTab
an interval join based on the time attributes in both tables (`ts`). It uses DataStream API to implement
a custom operator that deduplicates the user name using a `KeyedProcessFunction` and value state.
+{{< tabs "3f5f5d4e-cd03-48d1-9309-917a6cf66aba" >}}
+{{< tab "Java" >}}
```java
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.ValueState;
@@ -938,6 +1074,107 @@ env.execute();
// Bob
// Alice
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from datetime import datetime
+from pyflink.common import Row, Types
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode,
+ KeyedProcessFunction, RuntimeContext
+from pyflink.datastream.state import ValueStateDescriptor
+from pyflink.table import StreamTableEnvironment, Schema, DataTypes
+
+# setup DataStream API
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# use BATCH or STREAMING mode
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+
+# setup Table API
+table_env = StreamTableEnvironment.create(env)
+
+# create a user stream
+t_format = "%Y-%m-%dT%H:%M:%S"
+user_stream = env.from_collection(
+ [Row(datetime.strptime("2021-08-21T13:00:00", t_format), 1, "Alice"),
+ Row(datetime.strptime("2021-08-21T13:05:00", t_format), 2, "Bob"),
+ Row(datetime.strptime("2021-08-21T13:10:00", t_format), 2, "Bob")],
+ type_info=Types.ROW_NAMED(["ts1", "uid", "name"],
+ [Types.SQL_TIMESTAMP(), Types.INT(), Types.STRING()]))
+
+# create an order stream
+order_stream = env.from_collection(
+ [Row(datetime.strptime("2021-08-21T13:02:00", t_format), 1, 122),
+ Row(datetime.strptime("2021-08-21T13:07:00", t_format), 2, 239),
+ Row(datetime.strptime("2021-08-21T13:11:00", t_format), 2, 999)],
+ type_info=Types.ROW_NAMED(["ts1", "uid", "amount"],
+ [Types.SQL_TIMESTAMP(), Types.INT(), Types.INT()]))
+
+# # create corresponding tables
+table_env.create_temporary_view(
+ "user_table",
+ user_stream,
+ Schema.new_builder()
+ .column_by_expression("ts", "CAST(ts1 AS TIMESTAMP(3))")
+ .column("uid", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .watermark("ts", "ts - INTERVAL '1' SECOND")
+ .build())
+
+table_env.create_temporary_view(
+ "order_table",
+ order_stream,
+ Schema.new_builder()
+ .column_by_expression("ts", "CAST(ts1 AS TIMESTAMP(3))")
+ .column("uid", DataTypes.INT())
+ .column("amount", DataTypes.INT())
+ .watermark("ts", "ts - INTERVAL '1' SECOND")
+ .build())
+
+# perform interval join
+joined_table = table_env.sql_query(
+ "SELECT U.name, O.amount " +
+ "FROM user_table U, order_table O " +
+ "WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES")
+
+joined_stream = table_env.to_data_stream(joined_table)
+
+joined_stream.print()
+
+# implement a custom operator using ProcessFunction and value state
+class MyProcessFunction(KeyedProcessFunction):
+
+ def __init__(self):
+ self.seen = None
+
+ def open(self, runtime_context: RuntimeContext):
+ state_descriptor = ValueStateDescriptor("seen", Types.STRING())
+ self.seen = runtime_context.get_state(state_descriptor)
+
+ def process_element(self, value, ctx):
+ name = value[0]
+ if self.seen.value() is None:
+ self.seen.update(name)
+ yield name
+
+joined_stream \
+ .key_by(lambda r: r[0]) \
+ .process(MyProcessFunction()) \
+ .print()
+
+# execute unified pipeline
+env.execute()
+
+# prints (in both BATCH and STREAMING mode):
+# +I[Bob, 239]
+# +I[Alice, 122]
+# +I[Bob, 999]
+#
+# Bob
+# Alice
+```
+{{< /tab >}}
+{{< /tabs >}}
{{< top >}}
@@ -1420,6 +1657,9 @@ data structures. The following example in Java shows what is possible. Check als
[Data Types & Serialization]({{< ref "docs/dev/datastream/fault-tolerance/serialization/types_serialization" >}}) page of
the DataStream API for more information about the supported types there.
+
+{{< tabs "079cdf25-21ef-4393-ad69-623510038b3b" >}}
+{{< tab "Java" >}}
```java
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
@@ -1492,6 +1732,13 @@ table.printSchema();
// `user` *User<`name` STRING,`score` INT>*
// )
```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Custom PoJo Class is unsupported in PyFlink now.
+```
+{{< /tab >}}
+{{< /tabs >}}
### Examples for `createTemporaryView`
@@ -2639,6 +2886,59 @@ env.execute()
// +I[3]
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+from pyflink.common import Encoder
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import StreamingFileSink
+from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+statement_set = table_env.create_statement_set()
+
+# create some source
+source_descriptor = TableDescriptor.for_connector("datagen") \
+ .option("number-of-rows", "3") \
+ .schema(
+ Schema.new_builder()
+ .column("my_col", DataTypes.INT())
+ .column("my_other_col", DataTypes.BOOLEAN())
+ .build()) \
+ .build()
+
+# create some sink
+sink_descriptor = TableDescriptor.for_connector("print").build()
+
+# add a pure Table API pipeline
+table_from_source = table_env.from_descriptor(source_descriptor)
+statement_set.add_insert(sink_descriptor, table_from_source)
+
+
+# use table sinks for the DataStream API pipeline
+data_stream = env.from_collection([1, 2, 3])
+table_from_stream = table_env.from_data_stream(data_stream)
+statement_set.add_insert(sink_descriptor, table_from_stream)
+
+# define other DataStream API parts
+env.from_collection([4, 5, 6])
+ .add_sink(StreamingFileSink
+ .for_row_format('/tmp/output', Encoder.simple_string_encoder())
+ .build())
+
+# use DataStream API to submit the pipelines
+env.execute()
+
+# prints similar to:
+# +I[1618440447, false]
+# +I[1259693645, true]
+# +I[158588930, false]
+# +I[1]
+# +I[2]
+# +I[3]
+```
+{{< /tab >}}
{{< /tabs >}}
{{< top >}}