You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/07/07 11:27:22 UTC

[flink] branch release-1.15 updated: [FLINK-28140][python][docs] Improve the documentation by adding Python examples in Time Attributes page (#20194)

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new e75bfae36a1 [FLINK-28140][python][docs] Improve the documentation by adding Python examples in Time Attributes page (#20194)
e75bfae36a1 is described below

commit e75bfae36a1bd5752628353b95a86a7f665e9134
Author: pengmide <97...@users.noreply.github.com>
AuthorDate: Thu Jul 7 19:23:25 2022 +0800

    [FLINK-28140][python][docs] Improve the documentation by adding Python examples in Time Attributes page (#20194)
---
 .../docs/dev/table/concepts/time_attributes.md     | 34 ++++++++++++++++++++++
 1 file changed, 34 insertions(+)

diff --git a/docs/content/docs/dev/table/concepts/time_attributes.md b/docs/content/docs/dev/table/concepts/time_attributes.md
index dd2d895e84b..7febf8d31a7 100644
--- a/docs/content/docs/dev/table/concepts/time_attributes.md
+++ b/docs/content/docs/dev/table/concepts/time_attributes.md
@@ -164,6 +164,30 @@ val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name
 val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+
+# Option 1:
+
+# extract timestamp and assign watermarks based on knowledge of the stream
+stream = input_stream.assign_timestamps_and_watermarks(...)
+
+table = t_env.from_data_stream(stream, col('user_name'), col('data'), col('user_action_time').rowtime)
+
+# Option 2:
+
+# extract timestamp from first field, and assign watermarks based on knowledge of the stream
+stream = input_stream.assign_timestamps_and_watermarks(...)
+
+# the first field has been used for timestamp extraction, and is no longer necessary
+# replace first field with a logical event time attribute
+table = t_env.from_data_stream(stream, col("user_action_time").rowtime, col('user_name'), col('data'))
+
+# Usage:
+
+table.window(Tumble.over(lit(10).minutes).on(col("user_action_time")).alias("userActionWindow"))
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 
@@ -222,4 +246,14 @@ val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"
 val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+stream = ...
+
+# declare an additional logical field as a processing time attribute
+table = t_env.from_data_stream(stream, col("UserActionTimestamp"), col("user_name"), col("data"), col("user_action_time").proctime)
+
+windowed_table = table.window(Tumble.over(lit(10).minutes).on(col("user_action_time")).alias("userActionWindow"))
+```
+{{< /tab >}}
 {{< /tabs >}}