You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/09/20 08:13:18 UTC
[3/4] flink git commit: [FLINK-6442] [table] Add registration for
TableSinks and INSERT INTO support for SQL and Table API.
[FLINK-6442] [table] Add registration for TableSinks and INSERT INTO support for SQL and Table API.
This closes #3829.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cb37cb9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cb37cb9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cb37cb9
Branch: refs/heads/master
Commit: 2cb37cb937c6f225ad7afe829a28a6eda043ffc1
Parents: df5efe9
Author: lincoln-lil <li...@gmail.com>
Authored: Thu May 4 17:52:34 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 20 10:12:13 2017 +0200
----------------------------------------------------------------------
docs/dev/table/common.md | 153 ++++++++++++--
docs/dev/table/sql.md | 83 ++++++--
docs/dev/table/tableApi.md | 105 ++++++++--
docs/dev/table/udfs.md | 20 +-
.../addons/hbase/HBaseConnectorITCase.java | 8 +-
.../flink/table/examples/java/WordCountSQL.java | 2 +-
.../table/examples/scala/StreamSQLExample.scala | 2 +-
.../table/examples/scala/WordCountSQL.scala | 2 +-
.../flink/table/api/BatchTableEnvironment.scala | 50 ++++-
.../table/api/StreamTableEnvironment.scala | 60 +++++-
.../flink/table/api/TableEnvironment.scala | 202 +++++++++++++++++--
.../apache/flink/table/api/queryConfig.scala | 1 +
.../org/apache/flink/table/api/table.scala | 41 +++-
.../flink/table/calcite/FlinkPlannerImpl.scala | 6 +-
.../table/plan/schema/TableSinkTable.scala | 45 +++++
.../runtime/batch/JavaTableSourceITCase.java | 2 +-
.../runtime/batch/sql/GroupingSetsITCase.java | 6 +-
.../table/runtime/batch/sql/JavaSqlITCase.java | 12 +-
.../table/runtime/stream/sql/JavaSqlITCase.java | 8 +-
.../api/batch/BatchTableEnvironmentTest.scala | 4 +-
.../sql/validation/CalcValidationTest.scala | 2 +-
.../validation/InsertIntoValidationTest.scala | 77 +++++++
.../sql/validation/JoinValidationTest.scala | 22 +-
.../validation/OverWindowValidationTest.scala | 4 +-
.../sql/validation/SortValidationTest.scala | 2 +-
.../validation/InsertIntoValidationTest.scala | 61 ++++++
.../api/stream/StreamTableEnvironmentTest.scala | 4 +-
.../flink/table/api/stream/sql/JoinTest.scala | 4 +-
.../validation/InsertIntoValidationTest.scala | 87 ++++++++
.../validation/OverWindowValidationTest.scala | 6 +-
.../validation/CorrelateValidationTest.scala | 6 +-
.../validation/InsertIntoValidationTest.scala | 68 +++++++
.../validation/TableSinksValidationTest.scala | 27 ++-
.../expressions/utils/ExpressionTestBase.scala | 1 -
.../plan/ExpressionReductionRulesTest.scala | 4 +-
.../flink/table/plan/RetractionRulesTest.scala | 2 +-
.../plan/TimeIndicatorConversionTest.scala | 8 +-
.../runtime/batch/sql/AggregateITCase.scala | 34 ++--
.../table/runtime/batch/sql/CalcITCase.scala | 26 +--
.../table/runtime/batch/sql/JoinITCase.scala | 44 ++--
.../runtime/batch/sql/SetOperatorsITCase.scala | 24 +--
.../table/runtime/batch/sql/SortITCase.scala | 8 +-
.../batch/sql/TableEnvironmentITCase.scala | 35 +++-
.../runtime/batch/sql/TableSourceITCase.scala | 4 +-
.../table/runtime/batch/table/CalcITCase.scala | 6 +-
.../batch/table/TableEnvironmentITCase.scala | 24 +++
.../runtime/stream/TimeAttributesITCase.scala | 4 +-
.../table/runtime/stream/sql/JoinITCase.scala | 4 +-
.../runtime/stream/sql/OverWindowITCase.scala | 30 +--
.../table/runtime/stream/sql/SortITCase.scala | 2 +-
.../table/runtime/stream/sql/SqlITCase.scala | 59 ++++--
.../runtime/stream/sql/TableSourceITCase.scala | 2 +-
.../runtime/stream/table/TableSinkITCase.scala | 32 ++-
.../flink/table/utils/MemoryTableSinkUtil.scala | 84 ++++++++
.../table/utils/MockTableEnvironment.scala | 6 +
.../flink/table/utils/TableTestBase.scala | 8 +-
56 files changed, 1371 insertions(+), 262 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/common.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index fed2b6d..acd5711 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -50,7 +50,7 @@ tableEnv.registerExternalCatalog("extCat", ...);
// create a Table from a Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
// create a Table from a SQL query
-Table sqlResult = tableEnv.sql("SELECT ... FROM table2 ... ");
+Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...);
@@ -77,7 +77,7 @@ tableEnv.registerExternalCatalog("extCat", ...)
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
-val sqlResult = tableEnv.sql("SELECT ... FROM table2 ...")
+val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...)
@@ -149,18 +149,18 @@ val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
{% top %}
-Register a Table in the Catalog
+Register Tables in the Catalog
-------------------------------
-A `TableEnvironment` has an internal catalog of tables, organized by table name. Table API or SQL queries can access tables which are registered in the catalog, by referencing them by name.
+A `TableEnvironment` maintains a catalog of tables which are registered by name. There are two types of tables, *input tables* and *output tables*. Input tables can be referenced in Table API and SQL queries and provide input data. Output tables can be used to emit the result of a Table API or SQL query to an external system.
-A `TableEnvironment` allows you to register a table from various sources:
+An input table can be registered from various sources:
* an existing `Table` object, usually the result of a Table API or SQL query.
* a `TableSource`, which accesses external data, such as a file, database, or messaging system.
-* a `DataStream` or `DataSet` from a DataStream or DataSet program.
+* a `DataStream` or `DataSet` from a DataStream or DataSet program. Registering a `DataStream` or `DataSet` is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.
-Registering a `DataStream` or `DataSet` as a table is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.
+An output table can be registerd using a `TableSink`.
### Register a Table
@@ -200,7 +200,7 @@ tableEnv.registerTable("projectedTable", projTable)
### Register a TableSource
-A `TableSource` provides access to external data which is stored in a storage systems such as a database (MySQL, HBase, ...), a file with specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...).
+A `TableSource` provides access to external data which is stored in a storage system such as a database (MySQL, HBase, ...), a file with a specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...).
Flink aims to provide TableSources for common data formats and storage systems. Please have a look at the [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for a list of supported TableSources and instructions for how to build a custom `TableSource`.
@@ -236,6 +236,52 @@ tableEnv.registerTableSource("CsvTable", csvSource)
{% top %}
+### Register a TableSink
+
+A registered `TableSink` can be used to [emit the result of a Table API or SQL query](common.html#emit-a-table) to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Apache \[Parquet, Avro, ORC\], ...).
+
+Flink aims to provide TableSinks for common data formats and storage systems. Please see the documentation about [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for details about available sinks and instructions for how to implement a custom `TableSink`.
+
+A `TableSink` is registered in a `TableEnvironment` as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// create a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...);
+
+// define the field names and types
+String[] fieldNames = {"a", "b", "c"};
+TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
+
+// register the TableSink as table "CsvSinkTable"
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// get a TableEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// create a TableSink
+val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
+
+// define the field names and types
+val fieldNames: Arary[String] = Array("a", "b", "c")
+val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
+
+// register the TableSink as table "CsvSinkTable"
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
Register an External Catalog
----------------------------
@@ -342,7 +388,7 @@ Flink's SQL integration is based on [Apache Calcite](https://calcite.apache.org)
The [SQL]({{ site.baseurl }}/dev/table/sql.html) document describes Flink's SQL support for streaming and batch tables.
-The following example shows how to specify a query and return the result as a Table.
+The following example shows how to specify a query and return the result as a `Table`.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -353,7 +399,7 @@ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// compute revenue for all customers from France
-Table revenue = tableEnv.sql(
+Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
@@ -373,7 +419,7 @@ val tableEnv = TableEnvironment.getTableEnvironment(env)
// register Orders table
// compute revenue for all customers from France
-Table revenue = tableEnv.sql("""
+Table revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
@@ -387,6 +433,53 @@ Table revenue = tableEnv.sql("""
</div>
</div>
+The following example shows how to specify an update query that inserts its result into a registered table.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// register "Orders" table
+// register "RevenueFrance" output table
+
+// compute revenue for all customers from France and emit to "RevenueFrance"
+tableEnv.sqlUpdate(
+ "INSERT INTO RevenueFrance " +
+ "SELECT cID, cName, SUM(revenue) AS revSum " +
+ "FROM Orders " +
+ "WHERE cCountry = 'FRANCE' " +
+ "GROUP BY cID, cName"
+ );
+
+// execute query
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// get a TableEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// register "Orders" table
+// register "RevenueFrance" output table
+
+// compute revenue for all customers from France and emit to "RevenueFrance"
+tableEnv.sqlUpdate("""
+ |INSERT INTO RevenueFrance
+ |SELECT cID, cName, SUM(revenue) AS revSum
+ |FROM Orders
+ |WHERE cCountry = 'FRANCE'
+ |GROUP BY cID, cName
+ """.stripMargin)
+
+// execute query
+{% endhighlight %}
+
+</div>
+</div>
+
{% top %}
### Mixing Table API and SQL
@@ -401,12 +494,19 @@ Table API and SQL queries can be easily mixed because both return `Table` object
Emit a Table
------------
-In order to emit a `Table`, it can be written to a `TableSink`. A `TableSink` is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ).
+A `Table` is emitted by writing it to a `TableSink`. A `TableSink` is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ).
-A batch `Table` can only be written to a `BatchTableSink`, while a streaming table requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, or an `UpsertStreamTableSink`.
+A batch `Table` can only be written to a `BatchTableSink`, while a streaming `Table` requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, or an `UpsertStreamTableSink`.
Please see the documentation about [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) for details about available sinks and instructions for how to implement a custom `TableSink`.
+There are two ways to emit a table:
+
+1. The `Table.writeToSink(TableSink sink)` method emits the table using the provided `TableSink` and automatically configures the sink with the schema of the table to emit.
+2. The `Table.insertInto(String sinkTable)` method looks up a `TableSink` that was registered with a specific schema under the provided name in the `TableEnvironment`'s catalog. The schema of the table to emit is validated against the schema of the registered `TableSink`.
+
+The following examples shows how to emit a `Table`:
+
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
@@ -419,9 +519,18 @@ Table result = ...
// create a TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
-// write the result Table to the TableSink
+// METHOD 1:
+// Emit the result Table to the TableSink via the writeToSink() method
result.writeToSink(sink);
+// METHOD 2:
+// Register the TableSink with a specific schema
+String[] fieldNames = {"a", "b", "c"};
+TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
+// Emit the result Table to the registered TableSink via the insertInto() method
+result.insertInto("CsvSinkTable");
+
// execute the program
{% endhighlight %}
</div>
@@ -437,9 +546,18 @@ val result: Table = ...
// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
-// write the result Table to the TableSink
+// METHOD 1:
+// Emit the result Table to the TableSink via the writeToSink() method
result.writeToSink(sink)
+// METHOD 2:
+// Register the TableSink with a specific schema
+val fieldNames: Array[String] = Array("a", "b", "c")
+val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
+// Emit the result Table to the registered TableSink via the insertInto() method
+result.insertInto("CsvSinkTable")
+
// execute the program
{% endhighlight %}
</div>
@@ -458,8 +576,9 @@ Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/de
A Table API or SQL query is translated when:
-* the `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` is called.
-* the `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)).
+* a `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` or `Table.insertInto()` is called.
+* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called.
+* a `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)).
Once translated, a Table API or SQL query is handled like a regular DataStream or DataSet program and is executed when `StreamExecutionEnvironment.execute()` or `ExecutionEnvironment.execute()` is called.
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index fa4e3f3..b9205ab 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -49,15 +49,25 @@ DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// SQL query with an inlined (unregistered) table
Table table = tableEnv.toTable(ds, "user, product, amount");
-Table result = tableEnv.sql(
+Table result = tableEnv.sqlQuery(
"SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
// SQL query with a registered table
// register the DataStream as table "Orders"
tableEnv.registerDataStream("Orders", ds, "user, product, amount");
// run a SQL query on the Table and retrieve the result as a new Table
-Table result2 = tableEnv.sql(
+Table result2 = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+
+// SQL update with a registered table
+// create and register a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...);
+String[] fieldNames = {"product", "amount"};
+TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
+tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink);
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+ "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
{% endhighlight %}
</div>
@@ -71,15 +81,25 @@ val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// SQL query with an inlined (unregistered) table
val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
-val result = tableEnv.sql(
+val result = tableEnv.sqlQuery(
s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
// SQL query with a registered table
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
-val result2 = tableEnv.sql(
+val result2 = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+
+// SQL update with a registered table
+// create and register a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...)
+val fieldNames: Arary[String] = Array("product", "amount")
+val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
+tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink)
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+ "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
{% endhighlight %}
</div>
</div>
@@ -89,7 +109,7 @@ val result2 = tableEnv.sql(
Supported Syntax
----------------
-Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL. DML and DDL statements are not supported by Flink.
+Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL. DDL statements are not supported by Flink.
The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The [Operations](#operations) section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.
@@ -156,6 +176,10 @@ groupItem:
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
+insert:
+ INSERT INTO tableReference
+ query
+
```
Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java:
@@ -566,6 +590,39 @@ LIMIT 3
{% top %}
+### Insert
+
+<div markdown="1">
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Operation</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>
+ <strong>Insert Into</strong><br>
+ <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+ </td>
+ <td>
+ <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
+
+{% highlight sql %}
+INSERT INTO OutputTable
+SELECT users, tag
+FROM Orders
+{% endhighlight %}
+ </td>
+ </tr>
+
+ </tbody>
+</table>
+</div>
+
+{% top %}
+
### Group Windows
Group windows are defined in the `GROUP BY` clause of a SQL query. Just like queries with regular `GROUP BY` clauses, queries with a `GROUP BY` clause that includes a group window function compute a single result row per group. The following group windows functions are supported for SQL on batch and streaming tables.
@@ -649,22 +706,22 @@ DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime, rowtime.rowtime");
// compute SUM(amount) per day (in event-time)
-Table result1 = tableEnv.sql(
+Table result1 = tableEnv.sqlQuery(
"SELECT user, " +
" TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart, " +
" SUM(amount) FROM Orders " +
"GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");
// compute SUM(amount) per day (in processing-time)
-Table result2 = tableEnv.sql(
+Table result2 = tableEnv.sqlQuery(
"SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user");
// compute every hour the SUM(amount) of the last 24 hours in event-time
-Table result3 = tableEnv.sql(
+Table result3 = tableEnv.sqlQuery(
"SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product");
// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
-Table result4 = tableEnv.sql(
+Table result4 = tableEnv.sqlQuery(
"SELECT user, " +
" SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, " +
" SESSION_END(rowtime, INTERVAL '12' HOUR) AS snd, " +
@@ -686,7 +743,7 @@ val ds: DataStream[(Long, String, Int)] = env.addSource(...)
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime)
// compute SUM(amount) per day (in event-time)
-val result1 = tableEnv.sql(
+val result1 = tableEnv.sqlQuery(
"""
|SELECT
| user,
@@ -697,15 +754,15 @@ val result1 = tableEnv.sql(
""".stripMargin)
// compute SUM(amount) per day (in processing-time)
-val result2 = tableEnv.sql(
+val result2 = tableEnv.sqlQuery(
"SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user")
// compute every hour the SUM(amount) of the last 24 hours in event-time
-val result3 = tableEnv.sql(
+val result3 = tableEnv.sqlQuery(
"SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product")
// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
-val result4 = tableEnv.sql(
+val result4 = tableEnv.sqlQuery(
"""
|SELECT
| user,
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index fd57111..0a2acab 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -961,7 +961,52 @@ val result = left.select('a, 'b, 'c).where('a.in(right));
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Operators</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>
+ <strong>Order By</strong><br>
+ <span class="label label-primary">Batch</span>
+ </td>
+ <td>
+ <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc");
+{% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ <strong>Limit</strong><br>
+ <span class="label label-primary">Batch</span>
+ </td>
+ <td>
+ <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
+{% endhighlight %}
+or
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
+{% endhighlight %}
+ </td>
+ </tr>
+
+ </tbody>
+</table>
+</div>
+<div data-lang="scala" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -970,7 +1015,7 @@ val result = left.select('a, 'b, 'c).where('a.in(right));
</tr>
</thead>
<tbody>
- <tr>
+ <tr>
<td>
<strong>Order By</strong><br>
<span class="label label-primary">Batch</span>
@@ -1005,9 +1050,13 @@ val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with
</tbody>
</table>
-
</div>
-<div data-lang="scala" markdown="1">
+</div>
+
+### Insert
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
<table class="table table-bordered">
<thead>
@@ -1017,35 +1066,49 @@ val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with
</tr>
</thead>
<tbody>
- <tr>
+ <tr>
<td>
- <strong>Order By</strong><br>
- <span class="label label-primary">Batch</span>
+ <strong>Insert Into</strong><br>
+ <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
- <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
+ <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+
+ <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
+
{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc");
+Table orders = tableEnv.scan("Orders");
+orders.insertInto("OutOrders");
{% endhighlight %}
</td>
</tr>
+ </tbody>
+</table>
+</div>
+
+<div data-lang="scala" markdown="1">
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Operators</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+ <tbody>
<tr>
<td>
- <strong>Limit</strong><br>
- <span class="label label-primary">Batch</span>
+ <strong>Insert Into</strong><br>
+ <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
- <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
+ <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+
+ <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
+
+{% highlight scala %}
+val orders: Table = tableEnv.scan("Orders")
+orders.insertInto("OutOrders")
{% endhighlight %}
</td>
</tr>
@@ -1055,6 +1118,8 @@ Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning w
</div>
</div>
+{% top %}
+
### Group Windows
Group window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals.
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/udfs.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md
index 6c9bc1a..eef7db6 100644
--- a/docs/dev/table/udfs.md
+++ b/docs/dev/table/udfs.md
@@ -72,7 +72,7 @@ tableEnv.registerFunction("hashCode", new HashCode(10));
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL API
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
{% endhighlight %}
</div>
@@ -93,7 +93,7 @@ myTable.select('string, hashCode('string))
// register and use the function in SQL
tableEnv.registerFunction("hashCode", new HashCode(10))
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
{% endhighlight %}
</div>
</div>
@@ -176,9 +176,9 @@ myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
-tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
-tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
{% endhighlight %}
</div>
@@ -206,9 +206,9 @@ tableEnv.registerFunction("split", new Split("#"))
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API)
-tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
-tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");
{% endhighlight %}
**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
</div>
@@ -572,7 +572,7 @@ StreamTableEnvironment tEnv = ...
tEnv.registerFunction("wAvg", new WeightedAvg());
// use function
-tEnv.sql("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
+tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
{% endhighlight %}
</div>
@@ -649,7 +649,7 @@ val tEnv: StreamTableEnvironment = ???
tEnv.registerFunction("wAvg", new WeightedAvg())
// use function
-tEnv.sql("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
{% endhighlight %}
</div>
@@ -720,7 +720,7 @@ tableEnv.registerFunction("hashCode", new HashCode());
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
{% endhighlight %}
</div>
@@ -748,7 +748,7 @@ myTable.select('string, hashCode('string))
// register and use the function in SQL
tableEnv.registerFunction("hashCode", hashCode)
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
{% endhighlight %}
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index 5d71ca5..3da4230 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -155,7 +155,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
tableEnv.registerTableSource("hTable", hbaseTable);
- Table result = tableEnv.sql(
+ Table result = tableEnv.sqlQuery(
"SELECT " +
" h.family1.col1, " +
" h.family2.col1, " +
@@ -196,7 +196,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
tableEnv.registerTableSource("hTable", hbaseTable);
- Table result = tableEnv.sql(
+ Table result = tableEnv.sqlQuery(
"SELECT " +
" h.family1.col1, " +
" h.family3.col1, " +
@@ -236,7 +236,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
tableEnv.registerTableSource("hTable", hbaseTable);
- Table result = tableEnv.sql(
+ Table result = tableEnv.sqlQuery(
"SELECT * FROM hTable AS h"
);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
@@ -270,7 +270,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
tableEnv.registerFunction("toUTF8", new ToUTF8());
tableEnv.registerFunction("toLong", new ToLong());
- Table result = tableEnv.sql(
+ Table result = tableEnv.sqlQuery(
"SELECT " +
" toUTF8(h.family2.col1), " +
" toLong(h.family2.col2) " +
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
index 65efc17..22f0553 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
@@ -53,7 +53,7 @@ public class WordCountSQL {
tEnv.registerDataSet("WordCount", input, "word, frequency");
// run a SQL query on the Table and retrieve the result as a new Table
- Table table = tEnv.sql(
+ Table table = tEnv.sqlQuery(
"SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
index 665913e..3297aec 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
@@ -58,7 +58,7 @@ object StreamSQLExample {
tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
// union the two tables
- val result = tEnv.sql(
+ val result = tEnv.sqlQuery(
"SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
"SELECT * FROM OrderB WHERE amount < 2")
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
index a8b8268..55bbdb5 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
@@ -48,7 +48,7 @@ object WordCountSQL {
tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
// run a SQL query on the Table and retrieve the result as a new Table
- val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
+ val table = tEnv.sqlQuery("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
table.toDataSet[WC].print()
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index a9d60dd..bca5826 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -36,7 +36,7 @@ import org.apache.flink.table.expressions.{Expression, TimeAttribute}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSourceTable}
+import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSinkTable, TableSourceTable}
import org.apache.flink.table.runtime.MapRunner
import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
import org.apache.flink.table.sources.{BatchTableSource, TableSource}
@@ -106,6 +106,54 @@ abstract class BatchTableEnvironment(
}
/**
+ * Registers an external [[TableSink]] with given field names and types in this
+ * [[TableEnvironment]]'s catalog.
+ * Registered sink tables can be referenced in SQL DML statements.
+ *
+ * Example:
+ *
+ * {{{
+ * // create a table sink and its field names and types
+ * val fieldNames: Array[String] = Array("a", "b", "c")
+ * val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ * val tableSink: BatchTableSink = new YourTableSinkImpl(...)
+ *
+ * // register the table sink in the catalog
+ * tableEnv.registerTableSink("output_table", fieldNames, fieldsTypes, tableSink)
+ *
+ * // use the registered sink
+ * tableEnv.sqlUpdate("INSERT INTO output_table SELECT a, b, c FROM sourceTable")
+ * }}}
+ *
+ * @param name The name under which the [[TableSink]] is registered.
+ * @param fieldNames The field names to register with the [[TableSink]].
+ * @param fieldTypes The field types to register with the [[TableSink]].
+ * @param tableSink The [[TableSink]] to register.
+ */
+ def registerTableSink(
+ name: String,
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]],
+ tableSink: TableSink[_]): Unit = {
+
+ checkValidTableName(name)
+ if (fieldNames == null) throw TableException("fieldNames must not be null.")
+ if (fieldTypes == null) throw TableException("fieldTypes must not be null.")
+ if (fieldNames.length == 0) throw new TableException("fieldNames must not be empty.")
+ if (fieldNames.length != fieldTypes.length) {
+ throw new TableException("Same number of field names and types required.")
+ }
+
+ tableSink match {
+ case batchTableSink: BatchTableSink[_] =>
+ val configuredSink = batchTableSink.configure(fieldNames, fieldTypes)
+ registerTableInternal(name, new TableSinkTable(configuredSink))
+ case _ =>
+ throw new TableException("Only BatchTableSink can be registered in BatchTableEnvironment.")
+ }
+ }
+
+ /**
* Writes a [[Table]] to a [[TableSink]].
*
* Internally, the [[Table]] is translated into a [[DataSet]] and handed over to the
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 8d8cebb..c7cc61b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -42,12 +42,12 @@ import org.apache.flink.table.expressions._
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable}
import org.apache.flink.table.plan.util.UpdatingPlanChecker
import org.apache.flink.table.runtime.conversion._
+import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable, TableSinkTable}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.runtime.{CRowMapRunner, OutputRowtimeProcessFunction}
-import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
+import org.apache.flink.table.sinks._
import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
@@ -79,7 +79,7 @@ abstract class StreamTableEnvironment(
// the naming pattern for internally registered tables.
private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
- def queryConfig: StreamQueryConfig = new StreamQueryConfig
+ override def queryConfig: StreamQueryConfig = new StreamQueryConfig
/**
* Checks if the chosen table name is valid.
@@ -130,6 +130,60 @@ abstract class StreamTableEnvironment(
}
/**
+ * Registers an external [[TableSink]] with given field names and types in this
+ * [[TableEnvironment]]'s catalog.
+ * Registered sink tables can be referenced in SQL DML statements.
+ *
+ * Example:
+ *
+ * {{{
+ * // create a table sink and its field names and types
+ * val fieldNames: Array[String] = Array("a", "b", "c")
+ * val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ * val tableSink: StreamTableSink = new YourTableSinkImpl(...)
+ *
+ * // register the table sink in the catalog
+ * tableEnv.registerTableSink("output_table", fieldNames, fieldsTypes, tableSink)
+ *
+ * // use the registered sink
+ * tableEnv.sqlUpdate("INSERT INTO output_table SELECT a, b, c FROM sourceTable")
+ * }}}
+ *
+ * @param name The name under which the [[TableSink]] is registered.
+ * @param fieldNames The field names to register with the [[TableSink]].
+ * @param fieldTypes The field types to register with the [[TableSink]].
+ * @param tableSink The [[TableSink]] to register.
+ */
+ def registerTableSink(
+ name: String,
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]],
+ tableSink: TableSink[_]): Unit = {
+
+ checkValidTableName(name)
+ if (fieldNames == null) throw TableException("fieldNames must not be null.")
+ if (fieldTypes == null) throw TableException("fieldTypes must not be null.")
+ if (fieldNames.length == 0) throw new TableException("fieldNames must not be empty.")
+ if (fieldNames.length != fieldTypes.length) {
+ throw new TableException("Same number of field names and types required.")
+ }
+
+ tableSink match {
+ case streamTableSink@(
+ _: AppendStreamTableSink[_] |
+ _: UpsertStreamTableSink[_] |
+ _: RetractStreamTableSink[_]) =>
+
+ val configuredSink = streamTableSink.configure(fieldNames, fieldTypes)
+ registerTableInternal(name, new TableSinkTable(configuredSink))
+ case _ =>
+ throw new TableException(
+ "Only AppendStreamTableSink, UpsertStreamTableSink, and RetractStreamTableSink can be " +
+ "registered in StreamTableEnvironment.")
+ }
+ }
+
+ /**
* Writes a [[Table]] to a [[TableSink]].
*
* Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 2e9e18f..0424cf8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -31,7 +31,7 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.calcite.sql.SqlOperatorTable
+import org.apache.calcite.sql._
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.util.ChainedSqlOperatorTable
import org.apache.calcite.tools._
@@ -49,13 +49,13 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
-import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference, _}
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions, _}
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
import org.apache.flink.table.plan.cost.DataSetCostFactory
import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{RelTable, RowSchema}
+import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSinkTable}
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
@@ -110,6 +110,13 @@ abstract class TableEnvironment(val config: TableConfig) {
/** Returns the table config to define the runtime behavior of the Table API. */
def getConfig: TableConfig = config
+ /** Returns the [[QueryConfig]] depends on the concrete type of this TableEnvironment. */
+ private[flink] def queryConfig: QueryConfig = this match {
+ case _: BatchTableEnvironment => new BatchQueryConfig
+ case _: StreamTableEnvironment => new StreamQueryConfig
+ case _ => null
+ }
+
/**
* Returns the operator table for this environment including a custom Calcite configuration.
*/
@@ -276,7 +283,8 @@ abstract class TableEnvironment(val config: TableConfig) {
s"${t.msg}\n" +
s"Please check the documentation for the set of currently supported SQL features.")
case a: AssertionError =>
- throw a.getCause
+ // keep original exception stack for caller
+ throw a
}
output
}
@@ -414,6 +422,22 @@ abstract class TableEnvironment(val config: TableConfig) {
def registerTableSource(name: String, tableSource: TableSource[_]): Unit
/**
+ * Registers an external [[TableSink]] with given field names and types in this
+ * [[TableEnvironment]]'s catalog.
+ * Registered sink tables can be referenced in SQL DML statements.
+ *
+ * @param name The name under which the [[TableSink]] is registered.
+ * @param fieldNames The field names to register with the [[TableSink]].
+ * @param fieldTypes The field types to register with the [[TableSink]].
+ * @param tableSink The [[TableSink]] to register.
+ */
+ def registerTableSink(
+ name: String,
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]],
+ tableSink: TableSink[_]): Unit
+
+ /**
* Replaces a registered Table with another Table under the same name.
* We use this method to replace a [[org.apache.flink.table.plan.schema.DataStreamTable]]
* with a [[org.apache.calcite.schema.TranslatableTable]].
@@ -489,9 +513,10 @@ abstract class TableEnvironment(val config: TableConfig) {
/**
* Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
*
- * All tables referenced by the query must be registered in the TableEnvironment. But
- * [[Table.toString]] will automatically register an unique table name and return the
- * table name. So it allows to call SQL directly on tables like this:
+ * All tables referenced by the query must be registered in the TableEnvironment.
+ * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+ * when it is embedded into a String.
+ * Hence, SQL queries can directly reference a [[Table]] as follows:
*
* {{{
* val table: Table = ...
@@ -502,16 +527,110 @@ abstract class TableEnvironment(val config: TableConfig) {
* @param query The SQL query to evaluate.
* @return The result of the query as Table.
*/
+ @deprecated("Please use sqlQuery() instead.")
def sql(query: String): Table = {
+ sqlQuery(query)
+ }
+
+ /**
+ * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
+ *
+ * All tables referenced by the query must be registered in the TableEnvironment.
+ * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+ * when it is embedded into a String.
+ * Hence, SQL queries can directly reference a [[Table]] as follows:
+ *
+ * {{{
+ * val table: Table = ...
+ * // the table is not registered to the table environment
+ * tEnv.sqlQuery(s"SELECT * FROM $table")
+ * }}}
+ *
+ * @param query The SQL query to evaluate.
+ * @return The result of the query as Table
+ */
+ def sqlQuery(query: String): Table = {
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
// parse the sql query
val parsed = planner.parse(query)
- // validate the sql query
- val validated = planner.validate(parsed)
- // transform to a relational tree
- val relational = planner.rel(validated)
+ if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+ // validate the sql query
+ val validated = planner.validate(parsed)
+ // transform to a relational tree
+ val relational = planner.rel(validated)
+ new Table(this, LogicalRelNode(relational.rel))
+ } else {
+ throw new TableException(
+ "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " +
+ "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.")
+ }
+ }
+
+ /**
+ * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+ * NOTE: Currently only SQL INSERT statements are supported.
+ *
+ * All tables referenced by the query must be registered in the TableEnvironment.
+ * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+ * when it is embedded into a String.
+ * Hence, SQL queries can directly reference a [[Table]] as follows:
+ *
+ * {{{
+ * // register the table sink into which the result is inserted.
+ * tEnv.registerTableSink("sinkTable", fieldNames, fieldsTypes, tableSink)
+ * val sourceTable: Table = ...
+ * // sourceTable is not registered to the table environment
+ * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM $sourceTable")
+ * }}}
+ *
+ * @param stmt The SQL statement to evaluate.
+ */
+ def sqlUpdate(stmt: String): Unit = {
+ sqlUpdate(stmt, this.queryConfig)
+ }
- new Table(this, LogicalRelNode(relational.rel))
+ /**
+ * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+ * NOTE: Currently only SQL INSERT statements are supported.
+ *
+ * All tables referenced by the query must be registered in the TableEnvironment.
+ * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+ * when it is embedded into a String.
+ * Hence, SQL queries can directly reference a [[Table]] as follows:
+ *
+ * {{{
+ * // register the table sink into which the result is inserted.
+ * tEnv.registerTableSink("sinkTable", fieldNames, fieldsTypes, tableSink)
+ * val sourceTable: Table = ...
+ * // sourceTable is not registered to the table environment
+ * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM $sourceTable")
+ * }}}
+ *
+ * @param stmt The SQL statement to evaluate.
+ * @param config The [[QueryConfig]] to use.
+ */
+ def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
+ val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
+ // parse the sql query
+ val parsed = planner.parse(stmt)
+ parsed match {
+ case insert: SqlInsert =>
+ // validate the SQL query
+ val query = insert.getSource
+ planner.validate(query)
+
+ // get query result as Table
+ val queryResult = new Table(this, LogicalRelNode(planner.rel(query).rel))
+
+ // get name of sink table
+ val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
+
+ // insert query result into sink table
+ insertInto(queryResult, targetTableName, config)
+ case _ =>
+ throw new TableException(
+ "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
+ }
}
/**
@@ -519,11 +638,62 @@ abstract class TableEnvironment(val config: TableConfig) {
*
* @param table The [[Table]] to write.
* @param sink The [[TableSink]] to write the [[Table]] to.
+ * @param conf The [[QueryConfig]] to use.
* @tparam T The data type that the [[TableSink]] expects.
*/
private[flink] def writeToSink[T](table: Table, sink: TableSink[T], conf: QueryConfig): Unit
/**
+ * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+ *
+ * @param table The table to write to the TableSink.
+ * @param sinkTableName The name of the registered TableSink.
+ * @param conf The query configuration to use.
+ */
+ private[flink] def insertInto(table: Table, sinkTableName: String, conf: QueryConfig): Unit = {
+
+ // check that sink table exists
+ if (null == sinkTableName) throw TableException("Name of TableSink must not be null.")
+ if (sinkTableName.isEmpty) throw TableException("Name of TableSink must not be empty.")
+ if (!isRegistered(sinkTableName)) {
+ throw TableException(s"No table was registered under the name $sinkTableName.")
+ }
+
+ getTable(sinkTableName) match {
+ case s: TableSinkTable[_] =>
+ val tableSink = s.tableSink
+ // validate schema of source table and table sink
+ val srcFieldTypes = table.getSchema.getTypes
+ val sinkFieldTypes = tableSink.getFieldTypes
+
+ if (srcFieldTypes.length != sinkFieldTypes.length ||
+ srcFieldTypes.zip(sinkFieldTypes).exists{case (srcF, snkF) => srcF != snkF}) {
+
+ val srcFieldNames = table.getSchema.getColumnNames
+ val sinkFieldNames = tableSink.getFieldNames
+
+ // format table and table sink schema strings
+ val srcSchema = srcFieldNames.zip(srcFieldTypes)
+ .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+ .mkString("[", ", ", "]")
+ val sinkSchema = sinkFieldNames.zip(sinkFieldTypes)
+ .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+ .mkString("[", ", ", "]")
+
+ throw ValidationException(
+ s"Field types of query result and registered TableSink $sinkTableName do not match.\n" +
+ s"Query result schema: $srcSchema\n" +
+ s"TableSink schema: $sinkSchema")
+ }
+ // emit the table to the configured table sink
+ writeToSink(table, tableSink, conf)
+ case _ =>
+ throw TableException(s"The table registered as $sinkTableName is not a TableSink. " +
+ s"You can only emit query results to a registered TableSink.")
+ }
+ }
+
+ /**
* Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog.
*
* @param name The name under which the table will be registered.
@@ -554,10 +724,14 @@ abstract class TableEnvironment(val config: TableConfig) {
* @param name The table name to check.
* @return true, if a table is registered under the name, false otherwise.
*/
- protected def isRegistered(name: String): Boolean = {
+ protected[flink] def isRegistered(name: String): Boolean = {
rootSchema.getTableNames.contains(name)
}
+ private def getTable(name: String): org.apache.calcite.schema.Table = {
+ rootSchema.getTable(name)
+ }
+
protected def getRowType(name: String): RelDataType = {
rootSchema.getTable(name).getRowType(typeFactory)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
index c8fbab7..4aa5543 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.api
import _root_.java.io.Serializable
+
import org.apache.flink.api.common.time.Time
class QueryConfig private[table] extends Serializable {}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 2298575..30ed98e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionPar
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.plan.ProjectionTranslator._
import org.apache.flink.table.plan.logical.{Minus, _}
+import org.apache.flink.table.plan.schema.TableSinkTable
import org.apache.flink.table.sinks.TableSink
import _root_.scala.annotation.varargs
@@ -762,13 +763,10 @@ class Table(
* @tparam T The data type that the [[TableSink]] expects.
*/
def writeToSink[T](sink: TableSink[T]): Unit = {
-
- def queryConfig = this.tableEnv match {
- case s: StreamTableEnvironment => s.queryConfig
- case b: BatchTableEnvironment => new BatchQueryConfig
- case _ => null
+ val queryConfig = Option(this.tableEnv) match {
+ case None => null
+ case _ => this.tableEnv.queryConfig
}
-
writeToSink(sink, queryConfig)
}
@@ -800,6 +798,37 @@ class Table(
}
/**
+ * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+ *
+ * A batch [[Table]] can only be written to a
+ * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
+ * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+ * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+ * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].
+ *
+ * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written.
+ */
+ def insertInto(tableName: String): Unit = {
+ tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+ }
+
+ /**
+ * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+ *
+ * A batch [[Table]] can only be written to a
+ * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
+ * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+ * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+ * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].
+ *
+ * @param tableName Name of the [[TableSink]] to which the [[Table]] is written.
+ * @param conf The [[QueryConfig]] to use.
+ */
+ def insertInto(tableName: String, conf: QueryConfig): Unit = {
+ tableEnv.insertInto(this, tableName, conf)
+ }
+
+ /**
* Groups the records of a table by assigning them to windows defined by a time or row interval.
*
* For streaming tables of infinite size, grouping into windows is required to define finite
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index beb2436..4d9acaa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -57,7 +57,6 @@ class FlinkPlannerImpl(
val defaultSchema: SchemaPlus = config.getDefaultSchema
var validator: FlinkCalciteSqlValidator = _
- var validatedSqlNode: SqlNode = _
var root: RelRoot = _
private def ready() {
@@ -85,16 +84,15 @@ class FlinkPlannerImpl(
validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory)
validator.setIdentifierExpansion(true)
try {
- validatedSqlNode = validator.validate(sqlNode)
+ validator.validate(sqlNode)
}
catch {
case e: RuntimeException =>
throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
}
- validatedSqlNode
}
- def rel(sql: SqlNode): RelRoot = {
+ def rel(validatedSqlNode: SqlNode): RelRoot = {
try {
assert(validatedSqlNode != null)
val rexBuilder: RexBuilder = createRexBuilder
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
new file mode 100644
index 0000000..f5e80d5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sinks.TableSink
+
+/** Table which defines an external table via a [[TableSink]] */
+class TableSinkTable[T](
+ val tableSink: TableSink[T],
+ val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+ extends AbstractTable {
+
+ override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+ val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+ flinkTypeFactory.buildLogicalRowType(tableSink.getFieldNames, tableSink.getFieldTypes)
+ }
+
+ /**
+ * Returns statistics of current table
+ *
+ * @return statistics of current table
+ */
+ override def getStatistic: Statistic = statistic
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
index eb97afe..672b6fd 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
@@ -80,7 +80,7 @@ public class JavaTableSourceITCase extends TableProgramsCollectionTestBase {
tableEnv.registerTableSource("persons", csvTable);
Table result = tableEnv
- .sql("SELECT `last`, FLOOR(id), score * 2 FROM persons WHERE score < 20");
+ .sqlQuery("SELECT `last`, FLOOR(id), score * 2 FROM persons WHERE score < 20");
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
index 3c8a1cc..455e8ce 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
@@ -186,7 +186,7 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
* @param expected Expected result.
*/
private void checkSql(String query, String expected) throws Exception {
- Table resultTable = tableEnv.sql(query);
+ Table resultTable = tableEnv.sqlQuery(query);
DataSet<Row> resultDataSet = tableEnv.toDataSet(resultTable, Row.class);
List<Row> results = resultDataSet.collect();
TestBaseUtils.compareResultAsText(results, expected);
@@ -204,12 +204,12 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
};
// Execute first query and store results
- Table resultTable1 = tableEnv.sql(query1);
+ Table resultTable1 = tableEnv.sqlQuery(query1);
DataSet<Row> resultDataSet1 = tableEnv.toDataSet(resultTable1, Row.class);
List<String> results1 = resultDataSet1.map(mapFunction).collect();
// Execute second query and store results
- Table resultTable2 = tableEnv.sql(query2);
+ Table resultTable2 = tableEnv.sqlQuery(query2);
DataSet<Row> resultDataSet2 = tableEnv.toDataSet(resultTable2, Row.class);
List<String> results2 = resultDataSet2.map(mapFunction).collect();
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
index c5a394a..f9693fd 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
@@ -61,7 +61,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
String sqlQuery = "VALUES (1, 'Test', TRUE, DATE '1944-02-24', 12.4444444444444445)," +
"(2, 'Hello', TRUE, DATE '1944-02-24', 12.666666665)," +
"(3, 'World', FALSE, DATE '1944-12-24', 12.54444445)";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
@@ -83,7 +83,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
tableEnv.registerTable("T", in);
String sqlQuery = "SELECT a, c FROM T";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
@@ -106,7 +106,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
tableEnv.registerDataSet("DataSetTable", ds, "x, y, z");
String sqlQuery = "SELECT x FROM DataSetTable WHERE z LIKE '%Hello%'";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
@@ -123,7 +123,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
tableEnv.registerDataSet("AggTable", ds, "x, y, z");
String sqlQuery = "SELECT sum(x), min(x), max(x), count(y), avg(x) FROM AggTable";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
@@ -143,7 +143,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
tableEnv.registerDataSet("t2", ds2, "d, e, f, g, h");
String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
@@ -168,7 +168,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
tableEnv.registerDataSet("t1", ds1, "a, b");
String sqlQuery = "SELECT b['foo'] FROM t1";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
index c6368d4..f3d0309 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
@@ -68,7 +68,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
tableEnv.registerTable("MyTableRow", in);
String sqlQuery = "SELECT a,c FROM MyTableRow";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink<Row>());
@@ -93,7 +93,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
tableEnv.registerTable("MyTable", in);
String sqlQuery = "SELECT * FROM MyTable";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink<Row>());
@@ -117,7 +117,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink<Row>());
@@ -148,7 +148,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
String sqlQuery = "SELECT * FROM T1 " +
"UNION ALL " +
"(SELECT a, b, c FROM T2 WHERE a < 3)";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink<Row>());
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index c9a7049..dde5569 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -31,7 +31,7 @@ class BatchTableEnvironmentTest extends TableTestBase {
val util = batchTestUtil()
val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c)
- val sqlTable = util.tableEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12")
+ val sqlTable = util.tableEnv.sqlQuery(s"SELECT a, b, c FROM $table WHERE b > 12")
val expected = unaryNode(
"DataSetCalc",
@@ -43,7 +43,7 @@ class BatchTableEnvironmentTest extends TableTestBase {
val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
- val sqlTable2 = util.tableEnv.sql(s"SELECT d, e, f FROM $table, $table2 WHERE c = d")
+ val sqlTable2 = util.tableEnv.sqlQuery(s"SELECT d, e, f FROM $table, $table2 WHERE c = d")
val join = unaryNode(
"DataSetJoin",
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
index 9aada9a..69b12b2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
@@ -34,6 +34,6 @@ class CalcValidationTest extends TableTestBase {
val sqlQuery = "SELECT a, foo FROM MyTable"
- util.tableEnv.sql(sqlQuery)
+ util.tableEnv.sqlQuery(sqlQuery)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..ef9e6a3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.utils.{MemoryTableSinkUtil, TableTestBase}
+import org.junit._
+
+class InsertIntoValidationTest extends TableTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testInconsistentLengthInsert(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+ // must fail because table sink schema has too few fields
+ util.tableEnv.sqlUpdate(sql)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnmatchedTypesInsert(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+ // must fail because types of table sink do not match query result
+ util.tableEnv.sqlUpdate(sql)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnsupportedPartialInsert(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes = util.tableEnv.scan("sourceTable").getSchema.getTypes
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable (d, f) SELECT a, c FROM sourceTable"
+
+ // must fail because partial insert is not supported yet.
+ util.tableEnv.sqlUpdate(sql, util.tableEnv.queryConfig)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
index 90bcfec..d9e0e10 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
@@ -35,7 +35,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e"
- util.tableEnv.sql(sqlQuery)
+ util.tableEnv.sqlQuery(sqlQuery)
}
@Test(expected = classOf[TableException])
@@ -46,7 +46,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[ValidationException])
@@ -57,7 +57,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -68,7 +68,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -79,7 +79,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT a, a1 FROM Table3 CROSS JOIN Table4"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -90,7 +90,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -101,7 +101,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -112,7 +112,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -123,7 +123,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and e > 3"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -134,7 +134,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and b > 3"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -145,6 +145,6 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and b > 3"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
index 7e72a21..dfbdd5a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
@@ -41,7 +41,7 @@ class OverWindowValidationTest extends TableTestBase {
util.addFunction("overAgg", new OverAgg0)
val sqlQuery = "SELECT overAgg(b, a) FROM T"
- util.tableEnv.sql(sqlQuery)
+ util.tableEnv.sqlQuery(sqlQuery)
}
/**
@@ -55,6 +55,6 @@ class OverWindowValidationTest extends TableTestBase {
val sqlQuery = "SELECT overAgg(b, a) FROM T GROUP BY TUMBLE(ts, INTERVAL '2' HOUR)"
- util.tableEnv.sql(sqlQuery)
+ util.tableEnv.sqlQuery(sqlQuery)
}
}