You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/07/09 06:42:06 UTC

[flink] branch master updated: [hotfix][python][docs]Fix wrong example code in Python REPL and wrong table plan in table/common page (#9018)

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e4f4470  [hotfix][python][docs]Fix wrong example code in Python REPL and wrong table plan in table/common page (#9018)
e4f4470 is described below

commit e4f44709a56c1a1cdfc2093a7914db7c3b0bbcda
Author: HuangXingBo <hx...@gmail.com>
AuthorDate: Tue Jul 9 14:41:49 2019 +0800

    [hotfix][python][docs]Fix wrong example code in Python REPL and wrong table plan in table/common page (#9018)
---
 docs/dev/table/common.md      | 126 ++++++++++++++++++++++++++++++++++++------
 docs/dev/table/common.zh.md   | 126 ++++++++++++++++++++++++++++++++++++------
 docs/dev/table/tableApi.md    |   8 +--
 docs/dev/table/tableApi.zh.md |   8 +--
 docs/ops/python_shell.md      |  10 ++--
 docs/ops/python_shell.zh.md   |  10 ++--
 6 files changed, 232 insertions(+), 56 deletions(-)

diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index 705a978..5228ff2 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -1372,38 +1372,128 @@ print(explanation)
 </div>
 </div>
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight text %}
 == Abstract Syntax Tree ==
 LogicalUnion(all=[true])
-  LogicalFilter(condition=[LIKE($1, 'F%')])
-    LogicalTableScan(table=[[_DataStreamTable_0]])
-  LogicalTableScan(table=[[_DataStreamTable_1]])
+  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
+  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
 
 == Optimized Logical Plan ==
-DataStreamUnion(union=[count, word])
-  DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
-    DataStreamScan(table=[[_DataStreamTable_0]])
-  DataStreamScan(table=[[_DataStreamTable_1]])
+DataStreamUnion(all=[true], union all=[count, word])
+  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
+    DataStreamScan(id=[1], fields=[count, word])
+  DataStreamScan(id=[2], fields=[count, word])
 
 == Physical Execution Plan ==
 Stage 1 : Data Source
-  content : collect elements with CollectionInputFormat
+	content : collect elements with CollectionInputFormat
 
 Stage 2 : Data Source
-  content : collect elements with CollectionInputFormat
+	content : collect elements with CollectionInputFormat
 
-  Stage 3 : Operator
-    content : from: (count, word)
-    ship_strategy : REBALANCE
+	Stage 3 : Operator
+		content : from: (count, word)
+		ship_strategy : REBALANCE
 
-    Stage 4 : Operator
-      content : where: (LIKE(word, 'F%')), select: (count, word)
-      ship_strategy : FORWARD
+		Stage 4 : Operator
+			content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
+			ship_strategy : FORWARD
 
-      Stage 5 : Operator
-        content : from: (count, word)
-        ship_strategy : REBALANCE
+			Stage 5 : Operator
+				content : from: (count, word)
+				ship_strategy : REBALANCE
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight text %}
+== Abstract Syntax Tree ==
+LogicalUnion(all=[true])
+  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
+  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
+
+== Optimized Logical Plan ==
+DataStreamUnion(all=[true], union all=[count, word])
+  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
+    DataStreamScan(id=[1], fields=[count, word])
+  DataStreamScan(id=[2], fields=[count, word])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+	content : collect elements with CollectionInputFormat
+
+Stage 2 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 3 : Operator
+		content : from: (count, word)
+		ship_strategy : REBALANCE
+
+		Stage 4 : Operator
+			content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
+			ship_strategy : FORWARD
+
+			Stage 5 : Operator
+				content : from: (count, word)
+				ship_strategy : REBALANCE
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight text %}
+== Abstract Syntax Tree ==
+LogicalUnion(all=[true])
+  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+    FlinkLogicalDataStreamScan(id=[3], fields=[count, word])
+  FlinkLogicalDataStreamScan(id=[6], fields=[count, word])
+
+== Optimized Logical Plan ==
+DataStreamUnion(all=[true], union all=[count, word])
+  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
+    DataStreamScan(id=[3], fields=[count, word])
+  DataStreamScan(id=[6], fields=[count, word])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 2 : Operator
+		content : Flat Map
+		ship_strategy : FORWARD
+
+		Stage 3 : Operator
+			content : Map
+			ship_strategy : FORWARD
+
+Stage 4 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 5 : Operator
+		content : Flat Map
+		ship_strategy : FORWARD
+
+		Stage 6 : Operator
+			content : Map
+			ship_strategy : FORWARD
+
+			Stage 7 : Operator
+				content : Map
+				ship_strategy : FORWARD
+
+				Stage 8 : Operator
+					content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
+					ship_strategy : FORWARD
+
+					Stage 9 : Operator
+						content : Map
+						ship_strategy : FORWARD
+{% endhighlight %}
+</div>
+</div>
 
 {% top %}
 
diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md
index 2de8f74..5d6f9ae 100644
--- a/docs/dev/table/common.zh.md
+++ b/docs/dev/table/common.zh.md
@@ -1372,38 +1372,128 @@ print(explanation)
 </div>
 </div>
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight text %}
 == Abstract Syntax Tree ==
 LogicalUnion(all=[true])
-  LogicalFilter(condition=[LIKE($1, 'F%')])
-    LogicalTableScan(table=[[_DataStreamTable_0]])
-  LogicalTableScan(table=[[_DataStreamTable_1]])
+  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
+  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
 
 == Optimized Logical Plan ==
-DataStreamUnion(union=[count, word])
-  DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
-    DataStreamScan(table=[[_DataStreamTable_0]])
-  DataStreamScan(table=[[_DataStreamTable_1]])
+DataStreamUnion(all=[true], union all=[count, word])
+  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
+    DataStreamScan(id=[1], fields=[count, word])
+  DataStreamScan(id=[2], fields=[count, word])
 
 == Physical Execution Plan ==
 Stage 1 : Data Source
-  content : collect elements with CollectionInputFormat
+	content : collect elements with CollectionInputFormat
 
 Stage 2 : Data Source
-  content : collect elements with CollectionInputFormat
+	content : collect elements with CollectionInputFormat
 
-  Stage 3 : Operator
-    content : from: (count, word)
-    ship_strategy : REBALANCE
+	Stage 3 : Operator
+		content : from: (count, word)
+		ship_strategy : REBALANCE
 
-    Stage 4 : Operator
-      content : where: (LIKE(word, 'F%')), select: (count, word)
-      ship_strategy : FORWARD
+		Stage 4 : Operator
+			content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
+			ship_strategy : FORWARD
 
-      Stage 5 : Operator
-        content : from: (count, word)
-        ship_strategy : REBALANCE
+			Stage 5 : Operator
+				content : from: (count, word)
+				ship_strategy : REBALANCE
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight text %}
+== Abstract Syntax Tree ==
+LogicalUnion(all=[true])
+  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
+  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
+
+== Optimized Logical Plan ==
+DataStreamUnion(all=[true], union all=[count, word])
+  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
+    DataStreamScan(id=[1], fields=[count, word])
+  DataStreamScan(id=[2], fields=[count, word])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+	content : collect elements with CollectionInputFormat
+
+Stage 2 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 3 : Operator
+		content : from: (count, word)
+		ship_strategy : REBALANCE
+
+		Stage 4 : Operator
+			content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
+			ship_strategy : FORWARD
+
+			Stage 5 : Operator
+				content : from: (count, word)
+				ship_strategy : REBALANCE
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight text %}
+== Abstract Syntax Tree ==
+LogicalUnion(all=[true])
+  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+    FlinkLogicalDataStreamScan(id=[3], fields=[count, word])
+  FlinkLogicalDataStreamScan(id=[6], fields=[count, word])
+
+== Optimized Logical Plan ==
+DataStreamUnion(all=[true], union all=[count, word])
+  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
+    DataStreamScan(id=[3], fields=[count, word])
+  DataStreamScan(id=[6], fields=[count, word])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 2 : Operator
+		content : Flat Map
+		ship_strategy : FORWARD
+
+		Stage 3 : Operator
+			content : Map
+			ship_strategy : FORWARD
+
+Stage 4 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 5 : Operator
+		content : Flat Map
+		ship_strategy : FORWARD
+
+		Stage 6 : Operator
+			content : Map
+			ship_strategy : FORWARD
+
+			Stage 7 : Operator
+				content : Map
+				ship_strategy : FORWARD
+
+				Stage 8 : Operator
+					content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
+					ship_strategy : FORWARD
+
+					Stage 9 : Operator
+						content : Map
+						ship_strategy : FORWARD
+{% endhighlight %}
+</div>
+</div>
 
 {% top %}
 
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 744a82c..8dc33e1 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -923,10 +923,10 @@ result = orders.window(Tumble.over("5.minutes").on("rowtime").alias("w")) \
        <p>Similar to a SQL OVER clause. Over window aggregates are computed for each row, based on a window (range) of preceding and succeeding rows. See the <a href="#over-windows">over windows section</a> for more details.</p>
 {% highlight python %}
 orders = table_env.scan("Orders")
-result = orders.over_window(Over.partition_by("a").order_by("rowtime") \
-      	       .preceding("UNBOUNDED_RANGE").following("CURRENT_RANGE") \
-               .alias("w")) \
-               .select("a, b.avg over w, b.max over w, b.min over w")
+result = orders.over_window(Over.partition_by("a").order_by("rowtime")
+                            .preceding("UNBOUNDED_RANGE").following("CURRENT_RANGE")
+                            .alias("w")) \
+    .select("a, b.avg over w, b.max over w, b.min over w")
 {% endhighlight %}
        <p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming/time_attributes.html">time attribute</a>.</p>
       </td>
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index e4b8a55..409a7b4 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -922,10 +922,10 @@ result = orders.window(Tumble.over("5.minutes").on("rowtime").alias("w")) \
        <p>类似于SQL中的OVER开窗函数。Over窗口聚合对每一行都进行一次聚合计算,聚合的对象是以当前行的位置为基准,向前向后取一个区间范围内的所有数据。详情请见<a href="#over-windows">Over窗口</a>一节。</p>
 {% highlight python %}
 orders = table_env.scan("Orders")
-result = orders.over_window(Over.partition_by("a").order_by("rowtime") \
-      	       .preceding("UNBOUNDED_RANGE").following("CURRENT_RANGE") \
-               .alias("w")) \
-               .select("a, b.avg over w, b.max over w, b.min over w")
+result = orders.over_window(Over.partition_by("a").order_by("rowtime")
+                            .preceding("UNBOUNDED_RANGE").following("CURRENT_RANGE")
+                            .alias("w")) \
+    .select("a, b.avg over w, b.max over w, b.min over w")
 {% endhighlight %}
        <p><b>注意:</b> 所有的聚合操作必须在同一个窗口上定义,即分组,排序,范围等属性必须一致。目前,窗口区间范围的向前(PRECEDING)取值没有限制,可以为无界(UNBOUNDED),但是向后(FOLLOWING)只支持当前行(CURRENT ROW),其它向后范围取值暂不支持。排序(ORDER BY)属性必须指定单个<a href="streaming/time_attributes.html">时间属性</a>。</p>
       </td>
diff --git a/docs/ops/python_shell.md b/docs/ops/python_shell.md
index 9be6fc4..a52cd23 100644
--- a/docs/ops/python_shell.md
+++ b/docs/ops/python_shell.md
@@ -55,8 +55,7 @@ The example below is a simple program in the Python shell:
 ...         os.remove(sink_path)
 ...     else:
 ...         shutil.rmtree(sink_path)
->>> st_config = TableConfig.Builder().set_parallelism(1).as_streaming_execution().build()
->>> st_env = TableEnvironment.create(st_config)
+>>> s_env.set_parallelism(1)
 >>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
 >>> st_env.connect(FileSystem().path(sink_path))\
 ...     .with_format(OldCsv()
@@ -71,7 +70,7 @@ The example below is a simple program in the Python shell:
 ...     .register_table_sink("stream_sink")
 >>> t.select("a + 1, b, c")\
 ...     .insert_into("stream_sink")
->>> st_env.execute()
+>>> s_env.execute()
 >>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
 >>> with open(sink_path, 'r') as f:
 ...     print(f.read())
@@ -88,8 +87,7 @@ The example below is a simple program in the Python shell:
 ...         os.remove(sink_path)
 ...     else:
 ...         shutil.rmtree(sink_path)
->>> bt_config = TableConfig.Builder().set_parallelism(1).as_batch_execution().build()
->>> bt_env = TableEnvironment.create(bt_config)
+>>> b_env.set_parallelism(1)
 >>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
 >>> bt_env.connect(FileSystem().path(sink_path))\
 ...     .with_format(OldCsv()
@@ -104,7 +102,7 @@ The example below is a simple program in the Python shell:
 ...     .register_table_sink("batch_sink")
 >>> t.select("a + 1, b, c")\
 ...     .insert_into("batch_sink")
->>> bt_env.execute()
+>>> b_env.execute()
 >>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
 >>> with open(sink_path, 'r') as f:
 ...     print(f.read())
diff --git a/docs/ops/python_shell.zh.md b/docs/ops/python_shell.zh.md
index db30530..90440d8 100644
--- a/docs/ops/python_shell.zh.md
+++ b/docs/ops/python_shell.zh.md
@@ -54,8 +54,7 @@ bin/pyflink-shell.sh local
 ...         os.remove(sink_path)
 ...     else:
 ...         shutil.rmtree(sink_path)
->>> st_config = TableConfig.Builder().set_parallelism(1).as_streaming_execution().build()
->>> st_env = TableEnvironment.create(st_config)
+>>> s_env.set_parallelism(1)
 >>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
 >>> st_env.connect(FileSystem().path(sink_path))\
 ...     .with_format(OldCsv()
@@ -70,7 +69,7 @@ bin/pyflink-shell.sh local
 ...     .register_table_sink("stream_sink")
 >>> t.select("a + 1, b, c")\
 ...     .insert_into("stream_sink")
->>> st_env.execute()
+>>> s_env.execute()
 >>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
 >>> with open(sink_path, 'r') as f:
 ...     print(f.read())
@@ -87,8 +86,7 @@ bin/pyflink-shell.sh local
 ...         os.remove(sink_path)
 ...     else:
 ...         shutil.rmtree(sink_path)
->>> bt_config = TableConfig.Builder().set_parallelism(1).as_batch_execution().build()
->>> bt_env = TableEnvironment.create(bt_config)
+>>> b_env.set_parallelism(1)
 >>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
 >>> bt_env.connect(FileSystem().path(sink_path))\
 ...     .with_format(OldCsv()
@@ -103,7 +101,7 @@ bin/pyflink-shell.sh local
 ...     .register_table_sink("batch_sink")
 >>> t.select("a + 1, b, c")\
 ...     .insert_into("batch_sink")
->>> bt_env.execute()
+>>> b_env.execute()
 >>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
 >>> with open(sink_path, 'r') as f:
 ...     print(f.read())