You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/03/01 09:07:20 UTC

[flink] branch master updated: [hotfix][docs][table] Document the new `TablePipeline` API object

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f65c79  [hotfix][docs][table] Document the new `TablePipeline` API object
4f65c79 is described below

commit 4f65c7950f2c3ef849f2094deab0e199ffedf57b
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Wed Feb 16 14:56:21 2022 +0100

    [hotfix][docs][table] Document the new `TablePipeline` API object
    
    This closes #18804.
---
 docs/content/docs/dev/table/common.md              | 39 ++++++++++++++--------
 docs/content/docs/dev/table/data_stream_api.md     | 14 ++++----
 docs/content/docs/dev/table/tableApi.md            |  8 +++--
 .../apache/flink/table/api/TableDescriptor.java    |  2 +-
 4 files changed, 39 insertions(+), 24 deletions(-)

diff --git a/docs/content/docs/dev/table/common.md b/docs/content/docs/dev/table/common.md
index e661626..5a783ad 100644
--- a/docs/content/docs/dev/table/common.md
+++ b/docs/content/docs/dev/table/common.md
@@ -63,7 +63,7 @@ Table table2 = tableEnv.from("SourceTable");
 Table table3 = tableEnv.sqlQuery("SELECT * FROM SourceTable");
 
 // Emit a Table API result Table to a TableSink, same for SQL result
-TableResult tableResult = table2.executeInsert("SinkTable");
+TableResult tableResult = table2.insertInto("SinkTable").execute();
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -93,7 +93,7 @@ val table1 = tableEnv.from("SourceTable")
 val table2 = tableEnv.sqlQuery("SELECT * FROM SourceTable")
 
 // Emit a Table API result Table to a TableSink, same for SQL result
-val tableResult = table1.executeInsert("SinkTable")
+val tableResult = table1.insertInto("SinkTable").execute()
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
@@ -644,7 +644,9 @@ A batch `Table` can only be written to a `BatchTableSink`, while a streaming `Ta
 
 Please see the documentation about [Table Sources & Sinks]({{< ref "docs/dev/table/sourcesSinks" >}}) for details about available sinks and instructions for how to implement a custom `DynamicTableSink`.
 
-The `Table.executeInsert(String tableName)` method emits the `Table` to a registered `TableSink`. The method looks up the `TableSink` from the catalog by the name and validates that the schema of the `Table` is identical to the schema of the `TableSink`. 
+The `Table.insertInto(String tableName)` method defines a complete end-to-end pipeline emitting the source table to a registered sink table.
+The method looks up the table sink from the catalog by the name and validates that the schema of the `Table` is identical to the schema of the sink.
+A pipeline can be explained with `TablePipeline.explain()` and executed invoking `TablePipeline.execute()`.
 
 The following examples shows how to emit a `Table`:
 
@@ -672,9 +674,14 @@ tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("file
 // compute a result Table using Table API operators and/or SQL queries
 Table result = ...
 
-// emit the result Table to the registered TableSink
-result.executeInsert("CsvSinkTable");
+// Prepare the insert into pipeline
+TablePipeline pipeline = result.insertInto("CsvSinkTable");
+
+// Print explain details
+pipeline.printExplain();
 
+// emit the result Table to the registered TableSink
+pipeline.execute();
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -700,8 +707,14 @@ tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("file
 // compute a result Table using Table API operators and/or SQL queries
 val result: Table = ...
 
+// Prepare the insert into pipeline
+val pipeline = result.insertInto("CsvSinkTable")
+
+// Print explain details
+pipeline.printExplain()
+
 // emit the result Table to the registered TableSink
-result.executeInsert("CsvSinkTable")
+pipeline.execute()
 
 ```
 {{< /tab >}}
@@ -750,9 +763,9 @@ A query is internally represented as a logical query plan and is translated in t
 A Table API or SQL query is translated when:
 
 * `TableEnvironment.executeSql()` is called. This method is used for executing a given statement, and the sql query is translated immediately once this method is called.
-* `Table.executeInsert()` is called. This method is used for inserting the table content to the given sink path, and the Table API is translated immediately once this method is called.
-* `Table.execute()` is called. This method is used for collecting the table content to local client, and the Table API is translated immediately once this method is called.
-* `StatementSet.execute()` is called. A `Table` (emitted to a sink through `StatementSet.addInsert()`) or an INSERT statement (specified through `StatementSet.addInsertSql()`) will be buffered in `StatementSet` first. They are translated once `StatementSet.execute()` is called. All sinks will be optimized into one DAG.
+* `TablePipeline.execute()` is called. This method is used for executing a source-to-sink pipeline, and the Table API program is translated immediately once this method is called.
+* `Table.execute()` is called. This method is used for collecting the table content to the local client, and the Table API is translated immediately once this method is called.
+* `StatementSet.execute()` is called. A `TablePipeline` (emitted to a sink through `StatementSet.add()`) or an INSERT statement (specified through `StatementSet.addInsertSql()`) will be buffered in `StatementSet` first. They are transformed once `StatementSet.execute()` is called. All sinks will be optimized into one DAG.
 * A `Table` is translated when it is converted into a `DataStream` (see [Integration with DataStream](#integration-with-datastream)). Once translated, it's a regular DataStream program and is executed when `StreamExecutionEnvironment.execute()` is called.
 
 {{< top >}}
@@ -908,10 +921,10 @@ tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
 StatementSet stmtSet = tEnv.createStatementSet();
 
 Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
-stmtSet.addInsert("MySink1", table1);
+stmtSet.add(table1.insertInto("MySink1");
 
 Table table2 = table1.unionAll(tEnv.from("MySource2"));
-stmtSet.addInsert("MySink2", table2);
+stmtSet.add(table2.insertInto("MySink2");
 
 String explanation = stmtSet.explain();
 System.out.println(explanation);
@@ -952,10 +965,10 @@ tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
 val stmtSet = tEnv.createStatementSet()
 
 val table1 = tEnv.from("MySource1").where($"word".like("F%"))
-stmtSet.addInsert("MySink1", table1)
+stmtSet.add(table1.insertInto("MySink1"))
 
 val table2 = table1.unionAll(tEnv.from("MySource2"))
-stmtSet.addInsert("MySink2", table2)
+stmtSet.add(table2.insertInto("MySink2"))
 
 val explanation = stmtSet.explain()
 println(explanation)
diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md
index 04fbffb..7506149 100644
--- a/docs/content/docs/dev/table/data_stream_api.md
+++ b/docs/content/docs/dev/table/data_stream_api.md
@@ -598,13 +598,13 @@ pipeline or a statement set:
 
 ```java
 // execute with explicit sink
-tableEnv.from("InputTable").executeInsert("OutputTable")
+tableEnv.from("InputTable").insertInto("OutputTable").execute()
 
 tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
 
 tableEnv.createStatementSet()
-    .addInsert("OutputTable", tableEnv.from("InputTable"))
-    .addInsert("OutputTable2", tableEnv.from("InputTable"))
+    .add(tableEnv.from("InputTable").insertInto("OutputTable"))
+    .add(tableEnv.from("InputTable").insertInto("OutputTable2"))
     .execute()
 
 tableEnv.createStatementSet()
@@ -2562,12 +2562,12 @@ TableDescriptor sinkDescriptor = TableDescriptor.forConnector("print").build();
 
 // add a pure Table API pipeline
 Table tableFromSource = tableEnv.from(sourceDescriptor);
-statementSet.addInsert(sinkDescriptor, tableFromSource);
+statementSet.add(tableFromSource.insertInto(sinkDescriptor));
 
 // use table sinks for the DataStream API pipeline
 DataStream<Integer> dataStream = env.fromElements(1, 2, 3);
 Table tableFromStream = tableEnv.fromDataStream(dataStream);
-statementSet.addInsert(sinkDescriptor, tableFromStream);
+statementSet.add(tableFromStream.insertInto(sinkDescriptor));
 
 // attach both pipelines to StreamExecutionEnvironment
 // (the statement set will be cleared after calling this method)
@@ -2613,12 +2613,12 @@ val sinkDescriptor = TableDescriptor.forConnector("print").build
 
 // add a pure Table API pipeline
 val tableFromSource = tableEnv.from(sourceDescriptor)
-statementSet.addInsert(sinkDescriptor, tableFromSource)
+statementSet.add(tableFromSource.insertInto(sinkDescriptor))
 
 // use table sinks for the DataStream API pipeline
 val dataStream = env.fromElements(1, 2, 3)
 val tableFromStream = tableEnv.fromDataStream(dataStream)
-statementSet.addInsert(sinkDescriptor, tableFromStream)
+statementSet.add(tableFromStream.insertInto(sinkDescriptor))
 
 // attach both pipelines to StreamExecutionEnvironment
 // (the statement set will be cleared calling this method)
diff --git a/docs/content/docs/dev/table/tableApi.md b/docs/content/docs/dev/table/tableApi.md
index cafb78b..d4f132c 100644
--- a/docs/content/docs/dev/table/tableApi.md
+++ b/docs/content/docs/dev/table/tableApi.md
@@ -1454,7 +1454,9 @@ result3 = table.order_by(table.a.asc).offset(10).fetch(5)
 
 {{< label Batch >}} {{< label Streaming >}}
 
-Similar to the `INSERT INTO` clause in a SQL query, the method performs an insertion into a registered output table. The `executeInsert()` method will immediately submit a Flink job which execute the insert operation.
+Similar to the `INSERT INTO` clause in a SQL query, the method performs an insertion into a registered output table.
+The `insertInto()` method will transform the `INSERT INTO` to a `TablePipeline`.
+The pipeline can be explained with `TablePipeline.explain()` and executed with `TablePipeline.execute()`.
 
 Output tables must be registered in the TableEnvironment (see Connector tables). Moreover, the schema of the registered table must match the schema of the query.
 
@@ -1462,13 +1464,13 @@ Output tables must be registered in the TableEnvironment (see Connector tables).
 {{< tab "Java" >}}
 ```java
 Table orders = tableEnv.from("Orders");
-orders.executeInsert("OutOrders");
+orders.insertInto("OutOrders").execute();
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
 val orders = tableEnv.from("Orders")
-orders.executeInsert("OutOrders")
+orders.insertInto("OutOrders").execute()
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableDescriptor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableDescriptor.java
index ccf3c7d..d8f295d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableDescriptor.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableDescriptor.java
@@ -207,7 +207,7 @@ public class TableDescriptor {
          * Define the schema of the {@link TableDescriptor}.
          *
          * <p>The schema is typically required. It is optional only in cases where the schema can be
-         * inferred, e.g. {@link Table#executeInsert(TableDescriptor)}.
+         * inferred, e.g. {@link Table#insertInto(TableDescriptor)}.
          */
         public Builder schema(@Nullable Schema schema) {
             this.schema = schema;