You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/03/22 02:18:48 UTC
[flink] 01/02: [FLINK-26422][docs-zh][table]update chinese doc with the new TablePipeline docs
This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 50bfbdbbf1fe817d33aca3a675904ce682203a12
Author: zoucao <zh...@hotmail.com>
AuthorDate: Fri Mar 4 14:24:03 2022 +0800
[FLINK-26422][docs-zh][table]update chinese doc with the new TablePipeline docs
---
docs/content.zh/docs/dev/table/common.md | 36 ++++++++++++++++-------
docs/content.zh/docs/dev/table/data_stream_api.md | 14 ++++-----
docs/content.zh/docs/dev/table/tableApi.md | 8 +++--
docs/content/docs/dev/table/common.md | 4 +--
4 files changed, 39 insertions(+), 23 deletions(-)
diff --git a/docs/content.zh/docs/dev/table/common.md b/docs/content.zh/docs/dev/table/common.md
index a1d5539..dddac51 100644
--- a/docs/content.zh/docs/dev/table/common.md
+++ b/docs/content.zh/docs/dev/table/common.md
@@ -65,7 +65,7 @@ Table table2 = tableEnv.from("SourceTable");
Table table3 = tableEnv.sqlQuery("SELECT * FROM SourceTable");
// Emit a Table API result Table to a TableSink, same for SQL result
-TableResult tableResult = table2.executeInsert("SinkTable");
+TableResult tableResult = table2.insertInto("SinkTable").execute();
```
{{< /tab >}}
{{< tab "Scala" >}}
@@ -95,7 +95,7 @@ val table1 = tableEnv.from("SourceTable")
val table2 = tableEnv.sqlQuery("SELECT * FROM SourceTable")
// Emit a Table API result Table to a TableSink, same for SQL result
-val tableResult = table1.executeInsert("SinkTable")
+val tableResult = table1.insertInto("SinkTable").execute()
```
{{< /tab >}}
{{< tab "Python" >}}
@@ -645,7 +645,9 @@ Table API 和 SQL 查询的混用非常简单因为它们都返回 `Table` 对
请参考文档 [Table Sources & Sinks]({{< ref "docs/dev/table/sourcesSinks" >}}) 以获取更多关于可用 Sink 的信息以及如何自定义 `DynamicTableSink`。
-方法 `Table.executeInsert(String tableName)` 将 `Table` 发送至已注册的 `TableSink`。该方法通过名称在 catalog 中查找 `TableSink` 并确认`Table` schema 和 `TableSink` schema 一致。
+方法 `Table.insertInto(String tableName)` 定义了一个完整的端到端管道将源表中的数据传输到一个被注册的输出表中。
+该方法通过名称在 catalog 中查找输出表并确认 `Table` schema 和输出表 schema 一致。
+可以通过方法 `TablePipeline.explain()` 和 `TablePipeline.execute()` 分别来解释和执行一个数据流管道。
下面的示例演示如何输出 `Table`:
@@ -673,8 +675,14 @@ tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("file
// compute a result Table using Table API operators and/or SQL queries
Table result = ...
+// Prepare the insert into pipeline
+TablePipeline pipeline = result.insertInto("CsvSinkTable");
+
+// Print explain details
+pipeline.printExplain();
+
// emit the result Table to the registered TableSink
-result.executeInsert("CsvSinkTable");
+pipeline.execute();
```
{{< /tab >}}
@@ -701,8 +709,14 @@ tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("file
// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...
+// Prepare the insert into pipeline
+val pipeline = result.insertInto("CsvSinkTable")
+
+// Print explain details
+pipeline.printExplain()
+
// emit the result Table to the registered TableSink
-result.executeInsert("CsvSinkTable")
+pipeline.execute()
```
{{< /tab >}}
@@ -752,9 +766,9 @@ result.execute_insert("CsvSinkTable")
Table API 或者 SQL 查询在下列情况下会被翻译:
* 当 `TableEnvironment.executeSql()` 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
-* 当 `Table.executeInsert()` 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
+* 当 `TablePipeline.execute()` 被调用时。该方法是用来执行一个源表到输出表的数据流,一旦该方法被调用, TABLE API 程序立即被翻译。
* 当 `Table.execute()` 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
-* 当 `StatementSet.execute()` 被调用时。`Table` (通过 `StatementSet.addInsert()` 输出给某个 `Sink`)和 INSERT 语句 (通过调用 `StatementSet.addInsertSql()`)会先被缓存到 `StatementSet` 中,`StatementSet.execute()` 方法被调用时,所有的 sink 会被优化成一张有向无环图。
+* 当 `StatementSet.execute()` 被调用时。`TablePipeline` (通过 `StatementSet.add()` 输出给某个 `Sink`)和 INSERT 语句 (通过调用 `StatementSet.addInsertSql()`)会先被缓存到 `StatementSet` 中,`StatementSet.execute()` 方法被调用时,所有的 sink 会被优化成一张有向无环图。
* 当 `Table` 被转换成 `DataStream` 时(参阅[与 DataStream 集成](#integration-with-datastream))。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 `StreamExecutionEnvironment.execute()` 时被执行。
{{< top >}}
@@ -910,10 +924,10 @@ tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
StatementSet stmtSet = tEnv.createStatementSet();
Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
-stmtSet.addInsert("MySink1", table1);
+stmtSet.add(table1.insertInto("MySink1"));
Table table2 = table1.unionAll(tEnv.from("MySource2"));
-stmtSet.addInsert("MySink2", table2);
+stmtSet.add(table2.insertInto("MySink2"));
String explanation = stmtSet.explain();
System.out.println(explanation);
@@ -954,10 +968,10 @@ tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
val stmtSet = tEnv.createStatementSet()
val table1 = tEnv.from("MySource1").where($"word".like("F%"))
-stmtSet.addInsert("MySink1", table1)
+stmtSet.add(table1.insertInto("MySink1"))
val table2 = table1.unionAll(tEnv.from("MySource2"))
-stmtSet.addInsert("MySink2", table2)
+stmtSet.add(table2.insertInto("MySink2"))
val explanation = stmtSet.explain()
println(explanation)
diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md
index 3c91213..f04c561 100644
--- a/docs/content.zh/docs/dev/table/data_stream_api.md
+++ b/docs/content.zh/docs/dev/table/data_stream_api.md
@@ -598,13 +598,13 @@ pipeline or a statement set:
```java
// execute with explicit sink
-tableEnv.from("InputTable").executeInsert("OutputTable")
+tableEnv.from("InputTable").insertInto("OutputTable").execute()
tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
tableEnv.createStatementSet()
- .addInsert("OutputTable", tableEnv.from("InputTable"))
- .addInsert("OutputTable2", tableEnv.from("InputTable"))
+ .add(tableEnv.from("InputTable").insertInto("OutputTable"))
+ .add(tableEnv.from("InputTable").insertInto("OutputTable2"))
.execute()
tableEnv.createStatementSet()
@@ -2562,12 +2562,12 @@ TableDescriptor sinkDescriptor = TableDescriptor.forConnector("print").build();
// add a pure Table API pipeline
Table tableFromSource = tableEnv.from(sourceDescriptor);
-statementSet.addInsert(sinkDescriptor, tableFromSource);
+statementSet.add(tableFromSource.insertInto(sinkDescriptor));
// use table sinks for the DataStream API pipeline
DataStream<Integer> dataStream = env.fromElements(1, 2, 3);
Table tableFromStream = tableEnv.fromDataStream(dataStream);
-statementSet.addInsert(sinkDescriptor, tableFromStream);
+statementSet.add(tableFromStream.insertInto(sinkDescriptor));
// attach both pipelines to StreamExecutionEnvironment
// (the statement set will be cleared after calling this method)
@@ -2613,12 +2613,12 @@ val sinkDescriptor = TableDescriptor.forConnector("print").build
// add a pure Table API pipeline
val tableFromSource = tableEnv.from(sourceDescriptor)
-statementSet.addInsert(sinkDescriptor, tableFromSource)
+statementSet.add(tableFromSource.insertInto(sinkDescriptor))
// use table sinks for the DataStream API pipeline
val dataStream = env.fromElements(1, 2, 3)
val tableFromStream = tableEnv.fromDataStream(dataStream)
-statementSet.addInsert(sinkDescriptor, tableFromStream)
+statementSet.add(tableFromStream.insertInto(sinkDescriptor))
// attach both pipelines to StreamExecutionEnvironment
// (the statement set will be cleared calling this method)
diff --git a/docs/content.zh/docs/dev/table/tableApi.md b/docs/content.zh/docs/dev/table/tableApi.md
index 860464a..65a9f9d 100644
--- a/docs/content.zh/docs/dev/table/tableApi.md
+++ b/docs/content.zh/docs/dev/table/tableApi.md
@@ -1455,7 +1455,9 @@ result3 = table.order_by(table.a.asc).offset(10).fetch(5)
{{< label Batch >}} {{< label Streaming >}}
-和 SQL 查询中的 `INSERT INTO` 子句类似,该方法执行对已注册的输出表的插入操作。`executeInsert()` 方法将立即提交执行插入操作的 Flink job。
+和 SQL 查询中的 `INSERT INTO` 子句类似,该方法执行对已注册的输出表的插入操作。
+`insertInto()` 方法会将 `INSERT INTO` 转换为一个 `TablePipeline`。
+该数据流可以用 `TablePipeline.explain()` 来解释,用 `TablePipeline.execute()` 来执行。
输出表必须已注册在 TableEnvironment(详见表连接器)中。此外,已注册表的 schema 必须与查询中的 schema 相匹配。
@@ -1463,13 +1465,13 @@ result3 = table.order_by(table.a.asc).offset(10).fetch(5)
{{< tab "Java" >}}
```java
Table orders = tableEnv.from("Orders");
-orders.executeInsert("OutOrders");
+orders.insertInto("OutOrders").execute();
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
val orders = tableEnv.from("Orders")
-orders.executeInsert("OutOrders")
+orders.insertInto("OutOrders").execute()
```
{{< /tab >}}
{{< tab "Python" >}}
diff --git a/docs/content/docs/dev/table/common.md b/docs/content/docs/dev/table/common.md
index 5a783ad..0f8b309 100644
--- a/docs/content/docs/dev/table/common.md
+++ b/docs/content/docs/dev/table/common.md
@@ -921,10 +921,10 @@ tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
StatementSet stmtSet = tEnv.createStatementSet();
Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
-stmtSet.add(table1.insertInto("MySink1");
+stmtSet.add(table1.insertInto("MySink1"));
Table table2 = table1.unionAll(tEnv.from("MySource2"));
-stmtSet.add(table2.insertInto("MySink2");
+stmtSet.add(table2.insertInto("MySink2"));
String explanation = stmtSet.explain();
System.out.println(explanation);