You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/09/20 08:13:16 UTC
[1/4] flink git commit: [FLINK-6442] [table] Add registration for
TableSinks and INSERT INTO support for SQL and Table API.
Repository: flink
Updated Branches:
refs/heads/master 73a2443d4 -> 2cb37cb93
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index d99aac1..0a0d12e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -178,7 +178,7 @@ case class BatchTableTestUtil() extends TableTestUtil {
}
def verifySql(query: String, expected: String): Unit = {
- verifyTable(tableEnv.sql(query), expected)
+ verifyTable(tableEnv.sqlQuery(query), expected)
}
def verifyTable(resultTable: Table, expected: String): Unit = {
@@ -197,7 +197,7 @@ case class BatchTableTestUtil() extends TableTestUtil {
}
def printSql(query: String): Unit = {
- printTable(tableEnv.sql(query))
+ printTable(tableEnv.sqlQuery(query))
}
}
@@ -256,7 +256,7 @@ case class StreamTableTestUtil() extends TableTestUtil {
}
def verifySql(query: String, expected: String): Unit = {
- verifyTable(tableEnv.sql(query), expected)
+ verifyTable(tableEnv.sqlQuery(query), expected)
}
def verifyTable(resultTable: Table, expected: String): Unit = {
@@ -276,7 +276,7 @@ case class StreamTableTestUtil() extends TableTestUtil {
}
def printSql(query: String): Unit = {
- printTable(tableEnv.sql(query))
+ printTable(tableEnv.sqlQuery(query))
}
}
[3/4] flink git commit: [FLINK-6442] [table] Add registration for
TableSinks and INSERT INTO support for SQL and Table API.
Posted by fh...@apache.org.
[FLINK-6442] [table] Add registration for TableSinks and INSERT INTO support for SQL and Table API.
This closes #3829.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cb37cb9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cb37cb9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cb37cb9
Branch: refs/heads/master
Commit: 2cb37cb937c6f225ad7afe829a28a6eda043ffc1
Parents: df5efe9
Author: lincoln-lil <li...@gmail.com>
Authored: Thu May 4 17:52:34 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 20 10:12:13 2017 +0200
----------------------------------------------------------------------
docs/dev/table/common.md | 153 ++++++++++++--
docs/dev/table/sql.md | 83 ++++++--
docs/dev/table/tableApi.md | 105 ++++++++--
docs/dev/table/udfs.md | 20 +-
.../addons/hbase/HBaseConnectorITCase.java | 8 +-
.../flink/table/examples/java/WordCountSQL.java | 2 +-
.../table/examples/scala/StreamSQLExample.scala | 2 +-
.../table/examples/scala/WordCountSQL.scala | 2 +-
.../flink/table/api/BatchTableEnvironment.scala | 50 ++++-
.../table/api/StreamTableEnvironment.scala | 60 +++++-
.../flink/table/api/TableEnvironment.scala | 202 +++++++++++++++++--
.../apache/flink/table/api/queryConfig.scala | 1 +
.../org/apache/flink/table/api/table.scala | 41 +++-
.../flink/table/calcite/FlinkPlannerImpl.scala | 6 +-
.../table/plan/schema/TableSinkTable.scala | 45 +++++
.../runtime/batch/JavaTableSourceITCase.java | 2 +-
.../runtime/batch/sql/GroupingSetsITCase.java | 6 +-
.../table/runtime/batch/sql/JavaSqlITCase.java | 12 +-
.../table/runtime/stream/sql/JavaSqlITCase.java | 8 +-
.../api/batch/BatchTableEnvironmentTest.scala | 4 +-
.../sql/validation/CalcValidationTest.scala | 2 +-
.../validation/InsertIntoValidationTest.scala | 77 +++++++
.../sql/validation/JoinValidationTest.scala | 22 +-
.../validation/OverWindowValidationTest.scala | 4 +-
.../sql/validation/SortValidationTest.scala | 2 +-
.../validation/InsertIntoValidationTest.scala | 61 ++++++
.../api/stream/StreamTableEnvironmentTest.scala | 4 +-
.../flink/table/api/stream/sql/JoinTest.scala | 4 +-
.../validation/InsertIntoValidationTest.scala | 87 ++++++++
.../validation/OverWindowValidationTest.scala | 6 +-
.../validation/CorrelateValidationTest.scala | 6 +-
.../validation/InsertIntoValidationTest.scala | 68 +++++++
.../validation/TableSinksValidationTest.scala | 27 ++-
.../expressions/utils/ExpressionTestBase.scala | 1 -
.../plan/ExpressionReductionRulesTest.scala | 4 +-
.../flink/table/plan/RetractionRulesTest.scala | 2 +-
.../plan/TimeIndicatorConversionTest.scala | 8 +-
.../runtime/batch/sql/AggregateITCase.scala | 34 ++--
.../table/runtime/batch/sql/CalcITCase.scala | 26 +--
.../table/runtime/batch/sql/JoinITCase.scala | 44 ++--
.../runtime/batch/sql/SetOperatorsITCase.scala | 24 +--
.../table/runtime/batch/sql/SortITCase.scala | 8 +-
.../batch/sql/TableEnvironmentITCase.scala | 35 +++-
.../runtime/batch/sql/TableSourceITCase.scala | 4 +-
.../table/runtime/batch/table/CalcITCase.scala | 6 +-
.../batch/table/TableEnvironmentITCase.scala | 24 +++
.../runtime/stream/TimeAttributesITCase.scala | 4 +-
.../table/runtime/stream/sql/JoinITCase.scala | 4 +-
.../runtime/stream/sql/OverWindowITCase.scala | 30 +--
.../table/runtime/stream/sql/SortITCase.scala | 2 +-
.../table/runtime/stream/sql/SqlITCase.scala | 59 ++++--
.../runtime/stream/sql/TableSourceITCase.scala | 2 +-
.../runtime/stream/table/TableSinkITCase.scala | 32 ++-
.../flink/table/utils/MemoryTableSinkUtil.scala | 84 ++++++++
.../table/utils/MockTableEnvironment.scala | 6 +
.../flink/table/utils/TableTestBase.scala | 8 +-
56 files changed, 1371 insertions(+), 262 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/common.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index fed2b6d..acd5711 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -50,7 +50,7 @@ tableEnv.registerExternalCatalog("extCat", ...);
// create a Table from a Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
// create a Table from a SQL query
-Table sqlResult = tableEnv.sql("SELECT ... FROM table2 ... ");
+Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...);
@@ -77,7 +77,7 @@ tableEnv.registerExternalCatalog("extCat", ...)
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
-val sqlResult = tableEnv.sql("SELECT ... FROM table2 ...")
+val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...)
@@ -149,18 +149,18 @@ val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
{% top %}
-Register a Table in the Catalog
+Register Tables in the Catalog
-------------------------------
-A `TableEnvironment` has an internal catalog of tables, organized by table name. Table API or SQL queries can access tables which are registered in the catalog, by referencing them by name.
+A `TableEnvironment` maintains a catalog of tables which are registered by name. There are two types of tables, *input tables* and *output tables*. Input tables can be referenced in Table API and SQL queries and provide input data. Output tables can be used to emit the result of a Table API or SQL query to an external system.
-A `TableEnvironment` allows you to register a table from various sources:
+An input table can be registered from various sources:
* an existing `Table` object, usually the result of a Table API or SQL query.
* a `TableSource`, which accesses external data, such as a file, database, or messaging system.
-* a `DataStream` or `DataSet` from a DataStream or DataSet program.
+* a `DataStream` or `DataSet` from a DataStream or DataSet program. Registering a `DataStream` or `DataSet` is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.
-Registering a `DataStream` or `DataSet` as a table is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.
+An output table can be registerd using a `TableSink`.
### Register a Table
@@ -200,7 +200,7 @@ tableEnv.registerTable("projectedTable", projTable)
### Register a TableSource
-A `TableSource` provides access to external data which is stored in a storage systems such as a database (MySQL, HBase, ...), a file with specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...).
+A `TableSource` provides access to external data which is stored in a storage system such as a database (MySQL, HBase, ...), a file with a specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...).
Flink aims to provide TableSources for common data formats and storage systems. Please have a look at the [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for a list of supported TableSources and instructions for how to build a custom `TableSource`.
@@ -236,6 +236,52 @@ tableEnv.registerTableSource("CsvTable", csvSource)
{% top %}
+### Register a TableSink
+
+A registered `TableSink` can be used to [emit the result of a Table API or SQL query](common.html#emit-a-table) to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Apache \[Parquet, Avro, ORC\], ...).
+
+Flink aims to provide TableSinks for common data formats and storage systems. Please see the documentation about [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for details about available sinks and instructions for how to implement a custom `TableSink`.
+
+A `TableSink` is registered in a `TableEnvironment` as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// create a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...);
+
+// define the field names and types
+String[] fieldNames = {"a", "b", "c"};
+TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
+
+// register the TableSink as table "CsvSinkTable"
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// get a TableEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// create a TableSink
+val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
+
+// define the field names and types
+val fieldNames: Arary[String] = Array("a", "b", "c")
+val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
+
+// register the TableSink as table "CsvSinkTable"
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
Register an External Catalog
----------------------------
@@ -342,7 +388,7 @@ Flink's SQL integration is based on [Apache Calcite](https://calcite.apache.org)
The [SQL]({{ site.baseurl }}/dev/table/sql.html) document describes Flink's SQL support for streaming and batch tables.
-The following example shows how to specify a query and return the result as a Table.
+The following example shows how to specify a query and return the result as a `Table`.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -353,7 +399,7 @@ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// compute revenue for all customers from France
-Table revenue = tableEnv.sql(
+Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
@@ -373,7 +419,7 @@ val tableEnv = TableEnvironment.getTableEnvironment(env)
// register Orders table
// compute revenue for all customers from France
-Table revenue = tableEnv.sql("""
+Table revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
@@ -387,6 +433,53 @@ Table revenue = tableEnv.sql("""
</div>
</div>
+The following example shows how to specify an update query that inserts its result into a registered table.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// register "Orders" table
+// register "RevenueFrance" output table
+
+// compute revenue for all customers from France and emit to "RevenueFrance"
+tableEnv.sqlUpdate(
+ "INSERT INTO RevenueFrance " +
+ "SELECT cID, cName, SUM(revenue) AS revSum " +
+ "FROM Orders " +
+ "WHERE cCountry = 'FRANCE' " +
+ "GROUP BY cID, cName"
+ );
+
+// execute query
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// get a TableEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// register "Orders" table
+// register "RevenueFrance" output table
+
+// compute revenue for all customers from France and emit to "RevenueFrance"
+tableEnv.sqlUpdate("""
+ |INSERT INTO RevenueFrance
+ |SELECT cID, cName, SUM(revenue) AS revSum
+ |FROM Orders
+ |WHERE cCountry = 'FRANCE'
+ |GROUP BY cID, cName
+ """.stripMargin)
+
+// execute query
+{% endhighlight %}
+
+</div>
+</div>
+
{% top %}
### Mixing Table API and SQL
@@ -401,12 +494,19 @@ Table API and SQL queries can be easily mixed because both return `Table` object
Emit a Table
------------
-In order to emit a `Table`, it can be written to a `TableSink`. A `TableSink` is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ).
+A `Table` is emitted by writing it to a `TableSink`. A `TableSink` is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ).
-A batch `Table` can only be written to a `BatchTableSink`, while a streaming table requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, or an `UpsertStreamTableSink`.
+A batch `Table` can only be written to a `BatchTableSink`, while a streaming `Table` requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, or an `UpsertStreamTableSink`.
Please see the documentation about [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) for details about available sinks and instructions for how to implement a custom `TableSink`.
+There are two ways to emit a table:
+
+1. The `Table.writeToSink(TableSink sink)` method emits the table using the provided `TableSink` and automatically configures the sink with the schema of the table to emit.
+2. The `Table.insertInto(String sinkTable)` method looks up a `TableSink` that was registered with a specific schema under the provided name in the `TableEnvironment`'s catalog. The schema of the table to emit is validated against the schema of the registered `TableSink`.
+
+The following examples shows how to emit a `Table`:
+
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
@@ -419,9 +519,18 @@ Table result = ...
// create a TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
-// write the result Table to the TableSink
+// METHOD 1:
+// Emit the result Table to the TableSink via the writeToSink() method
result.writeToSink(sink);
+// METHOD 2:
+// Register the TableSink with a specific schema
+String[] fieldNames = {"a", "b", "c"};
+TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
+// Emit the result Table to the registered TableSink via the insertInto() method
+result.insertInto("CsvSinkTable");
+
// execute the program
{% endhighlight %}
</div>
@@ -437,9 +546,18 @@ val result: Table = ...
// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
-// write the result Table to the TableSink
+// METHOD 1:
+// Emit the result Table to the TableSink via the writeToSink() method
result.writeToSink(sink)
+// METHOD 2:
+// Register the TableSink with a specific schema
+val fieldNames: Array[String] = Array("a", "b", "c")
+val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
+// Emit the result Table to the registered TableSink via the insertInto() method
+result.insertInto("CsvSinkTable")
+
// execute the program
{% endhighlight %}
</div>
@@ -458,8 +576,9 @@ Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/de
A Table API or SQL query is translated when:
-* the `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` is called.
-* the `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)).
+* a `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` or `Table.insertInto()` is called.
+* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called.
+* a `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)).
Once translated, a Table API or SQL query is handled like a regular DataStream or DataSet program and is executed when `StreamExecutionEnvironment.execute()` or `ExecutionEnvironment.execute()` is called.
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index fa4e3f3..b9205ab 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -49,15 +49,25 @@ DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// SQL query with an inlined (unregistered) table
Table table = tableEnv.toTable(ds, "user, product, amount");
-Table result = tableEnv.sql(
+Table result = tableEnv.sqlQuery(
"SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
// SQL query with a registered table
// register the DataStream as table "Orders"
tableEnv.registerDataStream("Orders", ds, "user, product, amount");
// run a SQL query on the Table and retrieve the result as a new Table
-Table result2 = tableEnv.sql(
+Table result2 = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+
+// SQL update with a registered table
+// create and register a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...);
+String[] fieldNames = {"product", "amount"};
+TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
+tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink);
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+ "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
{% endhighlight %}
</div>
@@ -71,15 +81,25 @@ val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// SQL query with an inlined (unregistered) table
val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
-val result = tableEnv.sql(
+val result = tableEnv.sqlQuery(
s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
// SQL query with a registered table
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
-val result2 = tableEnv.sql(
+val result2 = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+
+// SQL update with a registered table
+// create and register a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...)
+val fieldNames: Arary[String] = Array("product", "amount")
+val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
+tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink)
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+ "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
{% endhighlight %}
</div>
</div>
@@ -89,7 +109,7 @@ val result2 = tableEnv.sql(
Supported Syntax
----------------
-Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL. DML and DDL statements are not supported by Flink.
+Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL. DDL statements are not supported by Flink.
The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The [Operations](#operations) section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.
@@ -156,6 +176,10 @@ groupItem:
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
+insert:
+ INSERT INTO tableReference
+ query
+
```
Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java:
@@ -566,6 +590,39 @@ LIMIT 3
{% top %}
+### Insert
+
+<div markdown="1">
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Operation</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>
+ <strong>Insert Into</strong><br>
+ <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+ </td>
+ <td>
+ <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
+
+{% highlight sql %}
+INSERT INTO OutputTable
+SELECT users, tag
+FROM Orders
+{% endhighlight %}
+ </td>
+ </tr>
+
+ </tbody>
+</table>
+</div>
+
+{% top %}
+
### Group Windows
Group windows are defined in the `GROUP BY` clause of a SQL query. Just like queries with regular `GROUP BY` clauses, queries with a `GROUP BY` clause that includes a group window function compute a single result row per group. The following group windows functions are supported for SQL on batch and streaming tables.
@@ -649,22 +706,22 @@ DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime, rowtime.rowtime");
// compute SUM(amount) per day (in event-time)
-Table result1 = tableEnv.sql(
+Table result1 = tableEnv.sqlQuery(
"SELECT user, " +
" TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart, " +
" SUM(amount) FROM Orders " +
"GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");
// compute SUM(amount) per day (in processing-time)
-Table result2 = tableEnv.sql(
+Table result2 = tableEnv.sqlQuery(
"SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user");
// compute every hour the SUM(amount) of the last 24 hours in event-time
-Table result3 = tableEnv.sql(
+Table result3 = tableEnv.sqlQuery(
"SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product");
// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
-Table result4 = tableEnv.sql(
+Table result4 = tableEnv.sqlQuery(
"SELECT user, " +
" SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, " +
" SESSION_END(rowtime, INTERVAL '12' HOUR) AS snd, " +
@@ -686,7 +743,7 @@ val ds: DataStream[(Long, String, Int)] = env.addSource(...)
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime)
// compute SUM(amount) per day (in event-time)
-val result1 = tableEnv.sql(
+val result1 = tableEnv.sqlQuery(
"""
|SELECT
| user,
@@ -697,15 +754,15 @@ val result1 = tableEnv.sql(
""".stripMargin)
// compute SUM(amount) per day (in processing-time)
-val result2 = tableEnv.sql(
+val result2 = tableEnv.sqlQuery(
"SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user")
// compute every hour the SUM(amount) of the last 24 hours in event-time
-val result3 = tableEnv.sql(
+val result3 = tableEnv.sqlQuery(
"SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product")
// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
-val result4 = tableEnv.sql(
+val result4 = tableEnv.sqlQuery(
"""
|SELECT
| user,
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index fd57111..0a2acab 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -961,7 +961,52 @@ val result = left.select('a, 'b, 'c).where('a.in(right));
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Operators</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>
+ <strong>Order By</strong><br>
+ <span class="label label-primary">Batch</span>
+ </td>
+ <td>
+ <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc");
+{% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ <strong>Limit</strong><br>
+ <span class="label label-primary">Batch</span>
+ </td>
+ <td>
+ <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
+{% endhighlight %}
+or
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
+{% endhighlight %}
+ </td>
+ </tr>
+
+ </tbody>
+</table>
+</div>
+<div data-lang="scala" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
@@ -970,7 +1015,7 @@ val result = left.select('a, 'b, 'c).where('a.in(right));
</tr>
</thead>
<tbody>
- <tr>
+ <tr>
<td>
<strong>Order By</strong><br>
<span class="label label-primary">Batch</span>
@@ -1005,9 +1050,13 @@ val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with
</tbody>
</table>
-
</div>
-<div data-lang="scala" markdown="1">
+</div>
+
+### Insert
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
<table class="table table-bordered">
<thead>
@@ -1017,35 +1066,49 @@ val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with
</tr>
</thead>
<tbody>
- <tr>
+ <tr>
<td>
- <strong>Order By</strong><br>
- <span class="label label-primary">Batch</span>
+ <strong>Insert Into</strong><br>
+ <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
- <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
+ <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+
+ <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
+
{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc");
+Table orders = tableEnv.scan("Orders");
+orders.insertInto("OutOrders");
{% endhighlight %}
</td>
</tr>
+ </tbody>
+</table>
+</div>
+
+<div data-lang="scala" markdown="1">
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Operators</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+ <tbody>
<tr>
<td>
- <strong>Limit</strong><br>
- <span class="label label-primary">Batch</span>
+ <strong>Insert Into</strong><br>
+ <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
- <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
+ <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+
+ <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
+
+{% highlight scala %}
+val orders: Table = tableEnv.scan("Orders")
+orders.insertInto("OutOrders")
{% endhighlight %}
</td>
</tr>
@@ -1055,6 +1118,8 @@ Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning w
</div>
</div>
+{% top %}
+
### Group Windows
Group window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals.
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/udfs.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md
index 6c9bc1a..eef7db6 100644
--- a/docs/dev/table/udfs.md
+++ b/docs/dev/table/udfs.md
@@ -72,7 +72,7 @@ tableEnv.registerFunction("hashCode", new HashCode(10));
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL API
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
{% endhighlight %}
</div>
@@ -93,7 +93,7 @@ myTable.select('string, hashCode('string))
// register and use the function in SQL
tableEnv.registerFunction("hashCode", new HashCode(10))
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
{% endhighlight %}
</div>
</div>
@@ -176,9 +176,9 @@ myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
-tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
-tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
{% endhighlight %}
</div>
@@ -206,9 +206,9 @@ tableEnv.registerFunction("split", new Split("#"))
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API)
-tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
-tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");
{% endhighlight %}
**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
</div>
@@ -572,7 +572,7 @@ StreamTableEnvironment tEnv = ...
tEnv.registerFunction("wAvg", new WeightedAvg());
// use function
-tEnv.sql("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
+tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
{% endhighlight %}
</div>
@@ -649,7 +649,7 @@ val tEnv: StreamTableEnvironment = ???
tEnv.registerFunction("wAvg", new WeightedAvg())
// use function
-tEnv.sql("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
{% endhighlight %}
</div>
@@ -720,7 +720,7 @@ tableEnv.registerFunction("hashCode", new HashCode());
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
{% endhighlight %}
</div>
@@ -748,7 +748,7 @@ myTable.select('string, hashCode('string))
// register and use the function in SQL
tableEnv.registerFunction("hashCode", hashCode)
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
{% endhighlight %}
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index 5d71ca5..3da4230 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -155,7 +155,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
tableEnv.registerTableSource("hTable", hbaseTable);
- Table result = tableEnv.sql(
+ Table result = tableEnv.sqlQuery(
"SELECT " +
" h.family1.col1, " +
" h.family2.col1, " +
@@ -196,7 +196,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
tableEnv.registerTableSource("hTable", hbaseTable);
- Table result = tableEnv.sql(
+ Table result = tableEnv.sqlQuery(
"SELECT " +
" h.family1.col1, " +
" h.family3.col1, " +
@@ -236,7 +236,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
tableEnv.registerTableSource("hTable", hbaseTable);
- Table result = tableEnv.sql(
+ Table result = tableEnv.sqlQuery(
"SELECT * FROM hTable AS h"
);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
@@ -270,7 +270,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
tableEnv.registerFunction("toUTF8", new ToUTF8());
tableEnv.registerFunction("toLong", new ToLong());
- Table result = tableEnv.sql(
+ Table result = tableEnv.sqlQuery(
"SELECT " +
" toUTF8(h.family2.col1), " +
" toLong(h.family2.col2) " +
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
index 65efc17..22f0553 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
@@ -53,7 +53,7 @@ public class WordCountSQL {
tEnv.registerDataSet("WordCount", input, "word, frequency");
// run a SQL query on the Table and retrieve the result as a new Table
- Table table = tEnv.sql(
+ Table table = tEnv.sqlQuery(
"SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
index 665913e..3297aec 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
@@ -58,7 +58,7 @@ object StreamSQLExample {
tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
// union the two tables
- val result = tEnv.sql(
+ val result = tEnv.sqlQuery(
"SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
"SELECT * FROM OrderB WHERE amount < 2")
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
index a8b8268..55bbdb5 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
@@ -48,7 +48,7 @@ object WordCountSQL {
tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
// run a SQL query on the Table and retrieve the result as a new Table
- val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
+ val table = tEnv.sqlQuery("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
table.toDataSet[WC].print()
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index a9d60dd..bca5826 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -36,7 +36,7 @@ import org.apache.flink.table.expressions.{Expression, TimeAttribute}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSourceTable}
+import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSinkTable, TableSourceTable}
import org.apache.flink.table.runtime.MapRunner
import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
import org.apache.flink.table.sources.{BatchTableSource, TableSource}
@@ -106,6 +106,54 @@ abstract class BatchTableEnvironment(
}
/**
+ * Registers an external [[TableSink]] with given field names and types in this
+ * [[TableEnvironment]]'s catalog.
+ * Registered sink tables can be referenced in SQL DML statements.
+ *
+ * Example:
+ *
+ * {{{
+ * // create a table sink and its field names and types
+ * val fieldNames: Array[String] = Array("a", "b", "c")
+ * val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ * val tableSink: BatchTableSink = new YourTableSinkImpl(...)
+ *
+ * // register the table sink in the catalog
+ * tableEnv.registerTableSink("output_table", fieldNames, fieldsTypes, tableSink)
+ *
+ * // use the registered sink
+ * tableEnv.sqlUpdate("INSERT INTO output_table SELECT a, b, c FROM sourceTable")
+ * }}}
+ *
+ * @param name The name under which the [[TableSink]] is registered.
+ * @param fieldNames The field names to register with the [[TableSink]].
+ * @param fieldTypes The field types to register with the [[TableSink]].
+ * @param tableSink The [[TableSink]] to register.
+ */
+ def registerTableSink(
+ name: String,
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]],
+ tableSink: TableSink[_]): Unit = {
+
+ checkValidTableName(name)
+ if (fieldNames == null) throw TableException("fieldNames must not be null.")
+ if (fieldTypes == null) throw TableException("fieldTypes must not be null.")
+ if (fieldNames.length == 0) throw new TableException("fieldNames must not be empty.")
+ if (fieldNames.length != fieldTypes.length) {
+ throw new TableException("Same number of field names and types required.")
+ }
+
+ tableSink match {
+ case batchTableSink: BatchTableSink[_] =>
+ val configuredSink = batchTableSink.configure(fieldNames, fieldTypes)
+ registerTableInternal(name, new TableSinkTable(configuredSink))
+ case _ =>
+ throw new TableException("Only BatchTableSink can be registered in BatchTableEnvironment.")
+ }
+ }
+
+ /**
* Writes a [[Table]] to a [[TableSink]].
*
* Internally, the [[Table]] is translated into a [[DataSet]] and handed over to the
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 8d8cebb..c7cc61b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -42,12 +42,12 @@ import org.apache.flink.table.expressions._
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable}
import org.apache.flink.table.plan.util.UpdatingPlanChecker
import org.apache.flink.table.runtime.conversion._
+import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable, TableSinkTable}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.runtime.{CRowMapRunner, OutputRowtimeProcessFunction}
-import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
+import org.apache.flink.table.sinks._
import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
@@ -79,7 +79,7 @@ abstract class StreamTableEnvironment(
// the naming pattern for internally registered tables.
private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
- def queryConfig: StreamQueryConfig = new StreamQueryConfig
+ override def queryConfig: StreamQueryConfig = new StreamQueryConfig
/**
* Checks if the chosen table name is valid.
@@ -130,6 +130,60 @@ abstract class StreamTableEnvironment(
}
/**
+ * Registers an external [[TableSink]] with given field names and types in this
+ * [[TableEnvironment]]'s catalog.
+ * Registered sink tables can be referenced in SQL DML statements.
+ *
+ * Example:
+ *
+ * {{{
+ * // create a table sink and its field names and types
+ * val fieldNames: Array[String] = Array("a", "b", "c")
+ * val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ * val tableSink: StreamTableSink = new YourTableSinkImpl(...)
+ *
+ * // register the table sink in the catalog
+ * tableEnv.registerTableSink("output_table", fieldNames, fieldsTypes, tableSink)
+ *
+ * // use the registered sink
+ * tableEnv.sqlUpdate("INSERT INTO output_table SELECT a, b, c FROM sourceTable")
+ * }}}
+ *
+ * @param name The name under which the [[TableSink]] is registered.
+ * @param fieldNames The field names to register with the [[TableSink]].
+ * @param fieldTypes The field types to register with the [[TableSink]].
+ * @param tableSink The [[TableSink]] to register.
+ */
+ def registerTableSink(
+ name: String,
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]],
+ tableSink: TableSink[_]): Unit = {
+
+ checkValidTableName(name)
+ if (fieldNames == null) throw TableException("fieldNames must not be null.")
+ if (fieldTypes == null) throw TableException("fieldTypes must not be null.")
+ if (fieldNames.length == 0) throw new TableException("fieldNames must not be empty.")
+ if (fieldNames.length != fieldTypes.length) {
+ throw new TableException("Same number of field names and types required.")
+ }
+
+ tableSink match {
+ case streamTableSink@(
+ _: AppendStreamTableSink[_] |
+ _: UpsertStreamTableSink[_] |
+ _: RetractStreamTableSink[_]) =>
+
+ val configuredSink = streamTableSink.configure(fieldNames, fieldTypes)
+ registerTableInternal(name, new TableSinkTable(configuredSink))
+ case _ =>
+ throw new TableException(
+ "Only AppendStreamTableSink, UpsertStreamTableSink, and RetractStreamTableSink can be " +
+ "registered in StreamTableEnvironment.")
+ }
+ }
+
+ /**
* Writes a [[Table]] to a [[TableSink]].
*
* Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 2e9e18f..0424cf8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -31,7 +31,7 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.calcite.sql.SqlOperatorTable
+import org.apache.calcite.sql._
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.util.ChainedSqlOperatorTable
import org.apache.calcite.tools._
@@ -49,13 +49,13 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
-import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference, _}
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions, _}
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
import org.apache.flink.table.plan.cost.DataSetCostFactory
import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{RelTable, RowSchema}
+import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSinkTable}
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
@@ -110,6 +110,13 @@ abstract class TableEnvironment(val config: TableConfig) {
/** Returns the table config to define the runtime behavior of the Table API. */
def getConfig: TableConfig = config
+ /** Returns the [[QueryConfig]] depends on the concrete type of this TableEnvironment. */
+ private[flink] def queryConfig: QueryConfig = this match {
+ case _: BatchTableEnvironment => new BatchQueryConfig
+ case _: StreamTableEnvironment => new StreamQueryConfig
+ case _ => null
+ }
+
/**
* Returns the operator table for this environment including a custom Calcite configuration.
*/
@@ -276,7 +283,8 @@ abstract class TableEnvironment(val config: TableConfig) {
s"${t.msg}\n" +
s"Please check the documentation for the set of currently supported SQL features.")
case a: AssertionError =>
- throw a.getCause
+ // keep original exception stack for caller
+ throw a
}
output
}
@@ -414,6 +422,22 @@ abstract class TableEnvironment(val config: TableConfig) {
def registerTableSource(name: String, tableSource: TableSource[_]): Unit
/**
+ * Registers an external [[TableSink]] with given field names and types in this
+ * [[TableEnvironment]]'s catalog.
+ * Registered sink tables can be referenced in SQL DML statements.
+ *
+ * @param name The name under which the [[TableSink]] is registered.
+ * @param fieldNames The field names to register with the [[TableSink]].
+ * @param fieldTypes The field types to register with the [[TableSink]].
+ * @param tableSink The [[TableSink]] to register.
+ */
+ def registerTableSink(
+ name: String,
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]],
+ tableSink: TableSink[_]): Unit
+
+ /**
* Replaces a registered Table with another Table under the same name.
* We use this method to replace a [[org.apache.flink.table.plan.schema.DataStreamTable]]
* with a [[org.apache.calcite.schema.TranslatableTable]].
@@ -489,9 +513,10 @@ abstract class TableEnvironment(val config: TableConfig) {
/**
* Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
*
- * All tables referenced by the query must be registered in the TableEnvironment. But
- * [[Table.toString]] will automatically register an unique table name and return the
- * table name. So it allows to call SQL directly on tables like this:
+ * All tables referenced by the query must be registered in the TableEnvironment.
+ * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+ * when it is embedded into a String.
+ * Hence, SQL queries can directly reference a [[Table]] as follows:
*
* {{{
* val table: Table = ...
@@ -502,16 +527,110 @@ abstract class TableEnvironment(val config: TableConfig) {
* @param query The SQL query to evaluate.
* @return The result of the query as Table.
*/
+ @deprecated("Please use sqlQuery() instead.")
def sql(query: String): Table = {
+ sqlQuery(query)
+ }
+
+ /**
+ * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
+ *
+ * All tables referenced by the query must be registered in the TableEnvironment.
+ * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+ * when it is embedded into a String.
+ * Hence, SQL queries can directly reference a [[Table]] as follows:
+ *
+ * {{{
+ * val table: Table = ...
+ * // the table is not registered to the table environment
+ * tEnv.sqlQuery(s"SELECT * FROM $table")
+ * }}}
+ *
+ * @param query The SQL query to evaluate.
+ * @return The result of the query as Table
+ */
+ def sqlQuery(query: String): Table = {
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
// parse the sql query
val parsed = planner.parse(query)
- // validate the sql query
- val validated = planner.validate(parsed)
- // transform to a relational tree
- val relational = planner.rel(validated)
+ if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+ // validate the sql query
+ val validated = planner.validate(parsed)
+ // transform to a relational tree
+ val relational = planner.rel(validated)
+ new Table(this, LogicalRelNode(relational.rel))
+ } else {
+ throw new TableException(
+ "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " +
+ "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.")
+ }
+ }
+
+ /**
+ * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+ * NOTE: Currently only SQL INSERT statements are supported.
+ *
+ * All tables referenced by the query must be registered in the TableEnvironment.
+ * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+ * when it is embedded into a String.
+ * Hence, SQL queries can directly reference a [[Table]] as follows:
+ *
+ * {{{
+ * // register the table sink into which the result is inserted.
+ * tEnv.registerTableSink("sinkTable", fieldNames, fieldsTypes, tableSink)
+ * val sourceTable: Table = ...
+ * // sourceTable is not registered to the table environment
+ * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM $sourceTable")
+ * }}}
+ *
+ * @param stmt The SQL statement to evaluate.
+ */
+ def sqlUpdate(stmt: String): Unit = {
+ sqlUpdate(stmt, this.queryConfig)
+ }
- new Table(this, LogicalRelNode(relational.rel))
+ /**
+ * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+ * NOTE: Currently only SQL INSERT statements are supported.
+ *
+ * All tables referenced by the query must be registered in the TableEnvironment.
+ * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+ * when it is embedded into a String.
+ * Hence, SQL queries can directly reference a [[Table]] as follows:
+ *
+ * {{{
+ * // register the table sink into which the result is inserted.
+ * tEnv.registerTableSink("sinkTable", fieldNames, fieldsTypes, tableSink)
+ * val sourceTable: Table = ...
+ * // sourceTable is not registered to the table environment
+ * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM $sourceTable")
+ * }}}
+ *
+ * @param stmt The SQL statement to evaluate.
+ * @param config The [[QueryConfig]] to use.
+ */
+ def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
+ val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
+ // parse the sql query
+ val parsed = planner.parse(stmt)
+ parsed match {
+ case insert: SqlInsert =>
+ // validate the SQL query
+ val query = insert.getSource
+ planner.validate(query)
+
+ // get query result as Table
+ val queryResult = new Table(this, LogicalRelNode(planner.rel(query).rel))
+
+ // get name of sink table
+ val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
+
+ // insert query result into sink table
+ insertInto(queryResult, targetTableName, config)
+ case _ =>
+ throw new TableException(
+ "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
+ }
}
/**
@@ -519,11 +638,62 @@ abstract class TableEnvironment(val config: TableConfig) {
*
* @param table The [[Table]] to write.
* @param sink The [[TableSink]] to write the [[Table]] to.
+ * @param conf The [[QueryConfig]] to use.
* @tparam T The data type that the [[TableSink]] expects.
*/
private[flink] def writeToSink[T](table: Table, sink: TableSink[T], conf: QueryConfig): Unit
/**
+ * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+ *
+ * @param table The table to write to the TableSink.
+ * @param sinkTableName The name of the registered TableSink.
+ * @param conf The query configuration to use.
+ */
+ private[flink] def insertInto(table: Table, sinkTableName: String, conf: QueryConfig): Unit = {
+
+ // check that sink table exists
+ if (null == sinkTableName) throw TableException("Name of TableSink must not be null.")
+ if (sinkTableName.isEmpty) throw TableException("Name of TableSink must not be empty.")
+ if (!isRegistered(sinkTableName)) {
+ throw TableException(s"No table was registered under the name $sinkTableName.")
+ }
+
+ getTable(sinkTableName) match {
+ case s: TableSinkTable[_] =>
+ val tableSink = s.tableSink
+ // validate schema of source table and table sink
+ val srcFieldTypes = table.getSchema.getTypes
+ val sinkFieldTypes = tableSink.getFieldTypes
+
+ if (srcFieldTypes.length != sinkFieldTypes.length ||
+ srcFieldTypes.zip(sinkFieldTypes).exists{case (srcF, snkF) => srcF != snkF}) {
+
+ val srcFieldNames = table.getSchema.getColumnNames
+ val sinkFieldNames = tableSink.getFieldNames
+
+ // format table and table sink schema strings
+ val srcSchema = srcFieldNames.zip(srcFieldTypes)
+ .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+ .mkString("[", ", ", "]")
+ val sinkSchema = sinkFieldNames.zip(sinkFieldTypes)
+ .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+ .mkString("[", ", ", "]")
+
+ throw ValidationException(
+ s"Field types of query result and registered TableSink $sinkTableName do not match.\n" +
+ s"Query result schema: $srcSchema\n" +
+ s"TableSink schema: $sinkSchema")
+ }
+ // emit the table to the configured table sink
+ writeToSink(table, tableSink, conf)
+ case _ =>
+ throw TableException(s"The table registered as $sinkTableName is not a TableSink. " +
+ s"You can only emit query results to a registered TableSink.")
+ }
+ }
+
+ /**
* Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog.
*
* @param name The name under which the table will be registered.
@@ -554,10 +724,14 @@ abstract class TableEnvironment(val config: TableConfig) {
* @param name The table name to check.
* @return true, if a table is registered under the name, false otherwise.
*/
- protected def isRegistered(name: String): Boolean = {
+ protected[flink] def isRegistered(name: String): Boolean = {
rootSchema.getTableNames.contains(name)
}
+ private def getTable(name: String): org.apache.calcite.schema.Table = {
+ rootSchema.getTable(name)
+ }
+
protected def getRowType(name: String): RelDataType = {
rootSchema.getTable(name).getRowType(typeFactory)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
index c8fbab7..4aa5543 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.api
import _root_.java.io.Serializable
+
import org.apache.flink.api.common.time.Time
class QueryConfig private[table] extends Serializable {}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 2298575..30ed98e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionPar
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.plan.ProjectionTranslator._
import org.apache.flink.table.plan.logical.{Minus, _}
+import org.apache.flink.table.plan.schema.TableSinkTable
import org.apache.flink.table.sinks.TableSink
import _root_.scala.annotation.varargs
@@ -762,13 +763,10 @@ class Table(
* @tparam T The data type that the [[TableSink]] expects.
*/
def writeToSink[T](sink: TableSink[T]): Unit = {
-
- def queryConfig = this.tableEnv match {
- case s: StreamTableEnvironment => s.queryConfig
- case b: BatchTableEnvironment => new BatchQueryConfig
- case _ => null
+ val queryConfig = Option(this.tableEnv) match {
+ case None => null
+ case _ => this.tableEnv.queryConfig
}
-
writeToSink(sink, queryConfig)
}
@@ -800,6 +798,37 @@ class Table(
}
/**
+ * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+ *
+ * A batch [[Table]] can only be written to a
+ * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
+ * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+ * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+ * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].
+ *
+ * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written.
+ */
+ def insertInto(tableName: String): Unit = {
+ tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+ }
+
+ /**
+ * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+ *
+ * A batch [[Table]] can only be written to a
+ * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
+ * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+ * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+ * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].
+ *
+ * @param tableName Name of the [[TableSink]] to which the [[Table]] is written.
+ * @param conf The [[QueryConfig]] to use.
+ */
+ def insertInto(tableName: String, conf: QueryConfig): Unit = {
+ tableEnv.insertInto(this, tableName, conf)
+ }
+
+ /**
* Groups the records of a table by assigning them to windows defined by a time or row interval.
*
* For streaming tables of infinite size, grouping into windows is required to define finite
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index beb2436..4d9acaa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -57,7 +57,6 @@ class FlinkPlannerImpl(
val defaultSchema: SchemaPlus = config.getDefaultSchema
var validator: FlinkCalciteSqlValidator = _
- var validatedSqlNode: SqlNode = _
var root: RelRoot = _
private def ready() {
@@ -85,16 +84,15 @@ class FlinkPlannerImpl(
validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory)
validator.setIdentifierExpansion(true)
try {
- validatedSqlNode = validator.validate(sqlNode)
+ validator.validate(sqlNode)
}
catch {
case e: RuntimeException =>
throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
}
- validatedSqlNode
}
- def rel(sql: SqlNode): RelRoot = {
+ def rel(validatedSqlNode: SqlNode): RelRoot = {
try {
assert(validatedSqlNode != null)
val rexBuilder: RexBuilder = createRexBuilder
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
new file mode 100644
index 0000000..f5e80d5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sinks.TableSink
+
+/** Table which defines an external table via a [[TableSink]] */
+class TableSinkTable[T](
+ val tableSink: TableSink[T],
+ val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+ extends AbstractTable {
+
+ override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+ val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+ flinkTypeFactory.buildLogicalRowType(tableSink.getFieldNames, tableSink.getFieldTypes)
+ }
+
+ /**
+ * Returns statistics of current table
+ *
+ * @return statistics of current table
+ */
+ override def getStatistic: Statistic = statistic
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
index eb97afe..672b6fd 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
@@ -80,7 +80,7 @@ public class JavaTableSourceITCase extends TableProgramsCollectionTestBase {
tableEnv.registerTableSource("persons", csvTable);
Table result = tableEnv
- .sql("SELECT `last`, FLOOR(id), score * 2 FROM persons WHERE score < 20");
+ .sqlQuery("SELECT `last`, FLOOR(id), score * 2 FROM persons WHERE score < 20");
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
index 3c8a1cc..455e8ce 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
@@ -186,7 +186,7 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
* @param expected Expected result.
*/
private void checkSql(String query, String expected) throws Exception {
- Table resultTable = tableEnv.sql(query);
+ Table resultTable = tableEnv.sqlQuery(query);
DataSet<Row> resultDataSet = tableEnv.toDataSet(resultTable, Row.class);
List<Row> results = resultDataSet.collect();
TestBaseUtils.compareResultAsText(results, expected);
@@ -204,12 +204,12 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
};
// Execute first query and store results
- Table resultTable1 = tableEnv.sql(query1);
+ Table resultTable1 = tableEnv.sqlQuery(query1);
DataSet<Row> resultDataSet1 = tableEnv.toDataSet(resultTable1, Row.class);
List<String> results1 = resultDataSet1.map(mapFunction).collect();
// Execute second query and store results
- Table resultTable2 = tableEnv.sql(query2);
+ Table resultTable2 = tableEnv.sqlQuery(query2);
DataSet<Row> resultDataSet2 = tableEnv.toDataSet(resultTable2, Row.class);
List<String> results2 = resultDataSet2.map(mapFunction).collect();
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
index c5a394a..f9693fd 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
@@ -61,7 +61,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
String sqlQuery = "VALUES (1, 'Test', TRUE, DATE '1944-02-24', 12.4444444444444445)," +
"(2, 'Hello', TRUE, DATE '1944-02-24', 12.666666665)," +
"(3, 'World', FALSE, DATE '1944-12-24', 12.54444445)";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
@@ -83,7 +83,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
tableEnv.registerTable("T", in);
String sqlQuery = "SELECT a, c FROM T";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
@@ -106,7 +106,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
tableEnv.registerDataSet("DataSetTable", ds, "x, y, z");
String sqlQuery = "SELECT x FROM DataSetTable WHERE z LIKE '%Hello%'";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
@@ -123,7 +123,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
tableEnv.registerDataSet("AggTable", ds, "x, y, z");
String sqlQuery = "SELECT sum(x), min(x), max(x), count(y), avg(x) FROM AggTable";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
@@ -143,7 +143,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
tableEnv.registerDataSet("t2", ds2, "d, e, f, g, h");
String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
@@ -168,7 +168,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
tableEnv.registerDataSet("t1", ds1, "a, b");
String sqlQuery = "SELECT b['foo'] FROM t1";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
index c6368d4..f3d0309 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
@@ -68,7 +68,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
tableEnv.registerTable("MyTableRow", in);
String sqlQuery = "SELECT a,c FROM MyTableRow";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink<Row>());
@@ -93,7 +93,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
tableEnv.registerTable("MyTable", in);
String sqlQuery = "SELECT * FROM MyTable";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink<Row>());
@@ -117,7 +117,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink<Row>());
@@ -148,7 +148,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
String sqlQuery = "SELECT * FROM T1 " +
"UNION ALL " +
"(SELECT a, b, c FROM T2 WHERE a < 3)";
- Table result = tableEnv.sql(sqlQuery);
+ Table result = tableEnv.sqlQuery(sqlQuery);
DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink<Row>());
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index c9a7049..dde5569 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -31,7 +31,7 @@ class BatchTableEnvironmentTest extends TableTestBase {
val util = batchTestUtil()
val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c)
- val sqlTable = util.tableEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12")
+ val sqlTable = util.tableEnv.sqlQuery(s"SELECT a, b, c FROM $table WHERE b > 12")
val expected = unaryNode(
"DataSetCalc",
@@ -43,7 +43,7 @@ class BatchTableEnvironmentTest extends TableTestBase {
val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
- val sqlTable2 = util.tableEnv.sql(s"SELECT d, e, f FROM $table, $table2 WHERE c = d")
+ val sqlTable2 = util.tableEnv.sqlQuery(s"SELECT d, e, f FROM $table, $table2 WHERE c = d")
val join = unaryNode(
"DataSetJoin",
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
index 9aada9a..69b12b2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
@@ -34,6 +34,6 @@ class CalcValidationTest extends TableTestBase {
val sqlQuery = "SELECT a, foo FROM MyTable"
- util.tableEnv.sql(sqlQuery)
+ util.tableEnv.sqlQuery(sqlQuery)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..ef9e6a3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.utils.{MemoryTableSinkUtil, TableTestBase}
+import org.junit._
+
+class InsertIntoValidationTest extends TableTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testInconsistentLengthInsert(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+ // must fail because table sink schema has too few fields
+ util.tableEnv.sqlUpdate(sql)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnmatchedTypesInsert(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+ // must fail because types of table sink do not match query result
+ util.tableEnv.sqlUpdate(sql)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnsupportedPartialInsert(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes = util.tableEnv.scan("sourceTable").getSchema.getTypes
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable (d, f) SELECT a, c FROM sourceTable"
+
+ // must fail because partial insert is not supported yet.
+ util.tableEnv.sqlUpdate(sql, util.tableEnv.queryConfig)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
index 90bcfec..d9e0e10 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
@@ -35,7 +35,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e"
- util.tableEnv.sql(sqlQuery)
+ util.tableEnv.sqlQuery(sqlQuery)
}
@Test(expected = classOf[TableException])
@@ -46,7 +46,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[ValidationException])
@@ -57,7 +57,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -68,7 +68,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -79,7 +79,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT a, a1 FROM Table3 CROSS JOIN Table4"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -90,7 +90,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -101,7 +101,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -112,7 +112,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -123,7 +123,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and e > 3"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -134,7 +134,7 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and b > 3"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
@@ -145,6 +145,6 @@ class JoinValidationTest extends TableTestBase {
val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and b > 3"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
index 7e72a21..dfbdd5a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
@@ -41,7 +41,7 @@ class OverWindowValidationTest extends TableTestBase {
util.addFunction("overAgg", new OverAgg0)
val sqlQuery = "SELECT overAgg(b, a) FROM T"
- util.tableEnv.sql(sqlQuery)
+ util.tableEnv.sqlQuery(sqlQuery)
}
/**
@@ -55,6 +55,6 @@ class OverWindowValidationTest extends TableTestBase {
val sqlQuery = "SELECT overAgg(b, a) FROM T GROUP BY TUMBLE(ts, INTERVAL '2' HOUR)"
- util.tableEnv.sql(sqlQuery)
+ util.tableEnv.sqlQuery(sqlQuery)
}
}
[2/4] flink git commit: [FLINK-6442] [table] Add registration for
TableSinks and INSERT INTO support for SQL and Table API.
Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
index d3f9b9f..cfc8067 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
@@ -34,6 +34,6 @@ class SortValidationTest extends TableTestBase {
val sqlQuery = "SELECT * FROM MyTable LIMIT 5"
- util.tableEnv.sql(sqlQuery).toDataSet[Row]
+ util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..2cfe931
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.{MemoryTableSinkUtil, TableTestBase}
+import org.junit._
+
+class InsertIntoValidationTest extends TableTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testInconsistentLengthInsert(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ // must fail because TableSink accepts fewer fields.
+ util.tableEnv.scan("sourceTable")
+ .select('a, 'b, 'c)
+ .insertInto("targetTable")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnmatchedTypesInsert(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ // must fail because types of result and TableSink do not match.
+ util.tableEnv.scan("sourceTable")
+ .select('a, 'b, 'c)
+ .insertInto("targetTable")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 0943ea6..1b99679 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -41,7 +41,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
val util = streamTestUtil()
val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c)
- val sqlTable = util.tableEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12")
+ val sqlTable = util.tableEnv.sqlQuery(s"SELECT a, b, c FROM $table WHERE b > 12")
val expected = unaryNode(
"DataStreamCalc",
@@ -53,7 +53,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
- val sqlTable2 = util.tableEnv.sql(s"SELECT d, e, f FROM $table2 " +
+ val sqlTable2 = util.tableEnv.sqlQuery(s"SELECT d, e, f FROM $table2 " +
s"UNION ALL SELECT a, b, c FROM $table")
val expected2 = binaryNode(
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 640fd26..e066fe4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -206,7 +206,7 @@ class JoinTest extends TableTestBase {
val query =
"SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
- val resultTable = streamUtil.tableEnv.sql(query)
+ val resultTable = streamUtil.tableEnv.sqlQuery(query)
val relNode = resultTable.getRelNode
val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
val rexNode = joinNode.getCondition
@@ -230,7 +230,7 @@ class JoinTest extends TableTestBase {
query: String,
expectCondStr: String): Unit = {
- val resultTable = streamUtil.tableEnv.sql(query)
+ val resultTable = streamUtil.tableEnv.sqlQuery(query)
val relNode = resultTable.getRelNode
val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
val joinInfo = joinNode.analyzeCondition
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..3045100
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
+import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.junit.Test
+
+class InsertIntoValidationTest {
+
+ @Test(expected = classOf[ValidationException])
+ def testInconsistentLengthInsert(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+ // must fail because table sink has too few fields.
+ tEnv.sqlUpdate(sql)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnmatchedTypesInsert(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+ // must fail because field types of table sink are incompatible.
+ tEnv.sqlUpdate(sql)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnsupportedPartialInsert(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable (d, f) SELECT a, c FROM sourceTable"
+
+ // must fail because we don't support partial insert yet.
+ tEnv.sqlUpdate(sql)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
index 413cca7..d04b6d0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
@@ -43,7 +43,7 @@ class OverWindowValidationTest extends TableTestBase {
"sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " +
"from T1"
- streamUtil.tableEnv.sql(sqlQuery).toAppendStream[Row]
+ streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
}
/**
@@ -55,7 +55,7 @@ class OverWindowValidationTest extends TableTestBase {
val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
- streamUtil.tableEnv.sql(sqlQuery)
+ streamUtil.tableEnv.sqlQuery(sqlQuery)
}
/**
@@ -66,6 +66,6 @@ class OverWindowValidationTest extends TableTestBase {
streamUtil.addFunction("overAgg", new OverAgg0)
val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
- streamUtil.tableEnv.sql(sqlQuery)
+ streamUtil.tableEnv.sqlQuery(sqlQuery)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
index dbc7d46..f58feed 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
@@ -145,7 +145,7 @@ class CorrelateValidationTest extends TableTestBase {
), "Undefined function: NONEXIST")
// SQL API call
expectExceptionThrown(
- util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
+ util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
"No match found for function signature nonexist(<NUMERIC>)")
@@ -160,7 +160,7 @@ class CorrelateValidationTest extends TableTestBase {
// SQL API call
// NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
expectExceptionThrown(
- util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
+ util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
null,
classOf[AssertionError])
@@ -172,7 +172,7 @@ class CorrelateValidationTest extends TableTestBase {
"Given parameters of function 'FUNC2' do not match any signature")
// SQL API call
expectExceptionThrown(
- util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
+ util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
"No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..2fcfd6c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
+import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.junit.Test
+
+class InsertIntoValidationTest {
+
+ @Test(expected = classOf[ValidationException])
+ def testInconsistentLengthInsert(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "f")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ // must fail because table sink has too few fields.
+ tEnv.scan("sourceTable")
+ .select('a, 'b, 'c)
+ .insertInto("targetTable")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnmatchedTypesInsert(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ // must fail because field types of table sink are incompatible.
+ tEnv.scan("sourceTable")
+ .select('a, 'b, 'c)
+ .insertInto("targetTable")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
index ab87cd3..628925b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
@@ -18,10 +18,12 @@
package org.apache.flink.table.api.validation
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{TableException, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.stream.table.TestAppendSink
+import org.apache.flink.table.utils.MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
import org.apache.flink.table.utils.TableTestBase
import org.junit.Test
@@ -39,4 +41,27 @@ class TableSinksValidationTest extends TableTestBase {
.writeToSink(new TestAppendSink)
}
+ @Test(expected = classOf[TableException])
+ def testSinkTableRegistrationUsingExistedTableName(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Int, String)]("TargetTable", 'id, 'text)
+
+ val fieldNames = Array("a", "b", "c")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+ // table name already registered
+ util.tableEnv
+ .registerTableSink("TargetTable", fieldNames, fieldTypes, new UnsafeMemoryAppendTableSink)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRegistrationWithInconsistentFieldNamesAndTypesLength(): Unit = {
+ val util = streamTestUtil()
+
+ // inconsistent length of field names and types
+ val fieldNames = Array("a", "b", "c")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG)
+
+ util.tableEnv
+ .registerTableSink("TargetTable", fieldNames, fieldTypes, new UnsafeMemoryAppendTableSink)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
index 44842f7..dd6e00e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.Future
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
-import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql2rel.RelDecorrelator
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
index a15f1d1..b4ad9ca 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
@@ -430,7 +430,7 @@ class ExpressionReductionRulesTest extends TableTestBase {
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+ val newTable = util.tableEnv.sqlQuery("SELECT 1 + 1 + a AS a FROM MyTable")
util.tableEnv.registerTable("NewTable", newTable)
@@ -448,7 +448,7 @@ class ExpressionReductionRulesTest extends TableTestBase {
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+ val newTable = util.tableEnv.sqlQuery("SELECT 1 + 1 + a AS a FROM MyTable")
util.tableEnv.registerTable("NewTable", newTable)
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
index 535bbf5..999a808 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
@@ -286,7 +286,7 @@ class RetractionRulesTest extends TableTestBase {
class StreamTableTestForRetractionUtil extends StreamTableTestUtil {
def verifySqlTrait(query: String, expected: String): Unit = {
- verifyTableTrait(tableEnv.sql(query), expected)
+ verifyTableTrait(tableEnv.sqlQuery(query), expected)
}
def verifyTableTrait(resultTable: Table, expected: String): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index ab80c65..cfff326 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -275,7 +275,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
val util = streamTestUtil()
util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
- val result = util.tableEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY proctime")
+ val result = util.tableEnv.sqlQuery("SELECT COUNT(long) FROM MyTable GROUP BY proctime")
val expected = unaryNode(
"DataStreamCalc",
@@ -300,7 +300,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
val util = streamTestUtil()
util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
- val result = util.tableEnv.sql("SELECT MIN(proctime) FROM MyTable GROUP BY long")
+ val result = util.tableEnv.sqlQuery("SELECT MIN(proctime) FROM MyTable GROUP BY long")
val expected = unaryNode(
"DataStreamCalc",
@@ -325,7 +325,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
val util = streamTestUtil()
util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
- val result = util.tableEnv.sql(
+ val result = util.tableEnv.sqlQuery(
"SELECT TUMBLE_END(rowtime, INTERVAL '0.1' SECOND) AS `rowtime`, `long`, " +
"SUM(`int`) FROM MyTable " +
"GROUP BY `long`, TUMBLE(rowtime, INTERVAL '0.1' SECOND)")
@@ -355,7 +355,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
val util = streamTestUtil()
util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
- val result = util.tableEnv.sql("SELECT MIN(rowtime), long FROM MyTable " +
+ val result = util.tableEnv.sqlQuery("SELECT MIN(rowtime), long FROM MyTable " +
"GROUP BY long, TUMBLE(rowtime, INTERVAL '0.1' SECOND)")
val expected = unaryNode(
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
index 39b8371..465a88c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
@@ -52,7 +52,7 @@ class AggregateITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "231,1,21,21,11"
val results = result.toDataSet[Row].collect()
@@ -70,7 +70,7 @@ class AggregateITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "231"
val results = result.toDataSet[Row].collect()
@@ -88,7 +88,7 @@ class AggregateITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "231"
val results = result.toDataSet[Row].collect()
@@ -109,7 +109,7 @@ class AggregateITCase(
(2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f, 'g)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1,1,1,1.5,1.5,2,Ciao,Ciao,Hello,Ciao,3.0"
val results = result.toDataSet[Row].collect()
@@ -128,7 +128,7 @@ class AggregateITCase(
val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).toTable(tEnv, 'a, 'b)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,3,2,1,3"
val results = result.toDataSet[Row].collect()
@@ -147,7 +147,7 @@ class AggregateITCase(
val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv, 'a, 'b)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "5.5,7"
val results = result.toDataSet[Row].collect()
@@ -165,7 +165,7 @@ class AggregateITCase(
val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "2,2"
val results = result.toDataSet[Row].collect()
@@ -187,7 +187,7 @@ class AggregateITCase(
(2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,3,2"
val results = result.toDataSet[Row].collect()
@@ -205,7 +205,7 @@ class AggregateITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "231,21"
val results = result.toDataSet[Row].collect()
@@ -223,7 +223,7 @@ class AggregateITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected =
"6,18,6\n5,13,5\n4,8,4\n3,5,3\n2,2,2\n1,1,1"
@@ -243,7 +243,7 @@ class AggregateITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)
- val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
val expected =
"6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" +
@@ -280,9 +280,9 @@ class AggregateITCase(
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
- val result2 = tEnv.sql(sqlQuery2)
- val result3 = tEnv.sql(sqlQuery3)
+ val result = tEnv.sqlQuery(sqlQuery)
+ val result2 = tEnv.sqlQuery(sqlQuery2)
+ val result3 = tEnv.sqlQuery(sqlQuery3)
val results = result.toDataSet[Row].collect()
val expected = Seq.empty
@@ -315,7 +315,7 @@ class AggregateITCase(
.map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
- val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
val expected = Seq(
"1,1,1,1,1",
"2,2,1,2,2", "2,3,1,2,3",
@@ -348,7 +348,7 @@ class AggregateITCase(
.map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
- val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
val expected = Seq(
"1,1,1,1,1","1,1,1,1,1",
"2,5,2,2,2","2,5,2,2,2",
@@ -383,7 +383,7 @@ class AggregateITCase(
.map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
- val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
val expected = Seq(
"2,10,39,6,3,7",
"16,21,111,6,6,18"
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 711182c..b891a7d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -53,7 +53,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -77,7 +77,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -101,7 +101,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -125,7 +125,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
@@ -146,7 +146,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- tEnv.sql(sqlQuery)
+ tEnv.sqlQuery(sqlQuery)
}
@Test
@@ -160,7 +160,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "\n"
val results = result.toDataSet[Row].collect()
@@ -178,7 +178,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
@@ -201,7 +201,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
val results = result.toDataSet[Row].collect()
@@ -219,7 +219,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
"6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
@@ -240,7 +240,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
val results = result.toDataSet[Row].collect()
@@ -258,7 +258,7 @@ class CalcITCase(
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
"9,4,Comment#3\n" + "17,6,Comment#11\n" +
@@ -281,7 +281,7 @@ class CalcITCase(
Timestamp.valueOf("1984-07-12 14:34:24")))
tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
"1984-07-12,14:34:24,1984-07-12 14:34:24.0"
@@ -300,7 +300,7 @@ class CalcITCase(
val ds = env.fromElements("a", "b", "c")
tEnv.registerDataSet("MyTable", ds, 'text)
- val result = tEnv.sql("SELECT hashCode(text) FROM MyTable")
+ val result = tEnv.sqlQuery("SELECT hashCode(text) FROM MyTable")
val expected = "97\n98\n99"
val results = result.toDataSet[Row].collect()
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
index 681b4b5..6a17cb4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
@@ -49,7 +49,7 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
val results = result.toDataSet[Row].collect()
@@ -69,7 +69,7 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi,Hallo\n"
val results = result.toDataSet[Row].collect()
@@ -89,7 +89,7 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
val results = result.toDataSet[Row].collect()
@@ -109,7 +109,7 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
"Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
@@ -130,7 +130,7 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1,Hi\n" + "2,Hello\n" + "1,Hello\n" +
"2,Hello world\n" + "2,Hello world\n" + "3,Hello world\n"
val results = result.toDataSet[Row].collect()
@@ -150,7 +150,7 @@ class JoinITCase(
tEnv.registerDataSet("Table3", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("Table5", ds2, 'd, 'e, 'f, 'g, 'h)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "6,6"
val results = result.toDataSet[Row].collect()
@@ -170,7 +170,7 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "6,6"
val results = result.toDataSet[Row].collect()
@@ -196,7 +196,7 @@ class JoinITCase(
"null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
"null,IJK\n" + "null,JKL\n" + "null,KLM"
- val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -218,7 +218,7 @@ class JoinITCase(
"null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
"null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
"null,IJK\n" + "null,JKL\n" + "null,KLM"
- val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -240,7 +240,7 @@ class JoinITCase(
"null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
"null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
"null,IJK\n" + "null,JKL\n" + "null,KLM"
- val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -257,7 +257,7 @@ class JoinITCase(
"3,1,1,Hi\n" +
"3,2,2,Hello\n" +
"3,3,2,Hello world"
- val result = tEnv.sql(sqlQuery2).collect()
+ val result = tEnv.sqlQuery(sqlQuery2).collect()
TestBaseUtils.compareResultAsText(result.asJava, expected)
}
@@ -274,7 +274,7 @@ class JoinITCase(
"1,1,Hi,3\n" +
"2,2,Hello,3\n" +
"3,2,Hello world,3"
- val result = tEnv.sql(sqlQuery1).collect()
+ val result = tEnv.sqlQuery(sqlQuery1).collect()
TestBaseUtils.compareResultAsText(result.asJava, expected)
}
@@ -287,7 +287,7 @@ class JoinITCase(
tEnv.registerTable("A", table)
val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A HAVING count(*) < 0)"
- val result = tEnv.sql(sqlQuery1).count()
+ val result = tEnv.sqlQuery(sqlQuery1).count()
Assert.assertEquals(0, result)
}
@@ -305,7 +305,7 @@ class JoinITCase(
tEnv.registerTable("B", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"1,null",
"2,null", "2,null",
@@ -331,7 +331,7 @@ class JoinITCase(
tEnv.registerTable("A", ds1)
tEnv.registerTable("B", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"1,null", "2,null", "2,null", "3,3", "3,3",
"3,3", "4,null", "4,null", "4,null",
@@ -355,7 +355,7 @@ class JoinITCase(
tEnv.registerTable("A", ds1)
tEnv.registerTable("B", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"1,3", "2,3", "2,3", "3,null", "3,null",
"3,null", "4,null", "4,null", "4,null",
@@ -380,7 +380,7 @@ class JoinITCase(
tEnv.registerTable("A", ds2)
tEnv.registerTable("B", ds1)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"2,null", "3,null", "1,null").mkString("\n")
@@ -402,7 +402,7 @@ class JoinITCase(
tEnv.registerTable("A", ds1)
tEnv.registerTable("B", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"1,null", "2,null", "2,null", "3,3", "3,3",
@@ -427,7 +427,7 @@ class JoinITCase(
tEnv.registerTable("A", ds1)
tEnv.registerTable("B", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"1,null", "2,null", "2,null", "3,null", "3,null",
@@ -453,7 +453,7 @@ class JoinITCase(
tEnv.registerTable("t1", ds1)
tEnv.registerTable("t2", ds2)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = Seq(
"1,null,null",
"2,null,null", "2,null,null",
@@ -481,7 +481,7 @@ class JoinITCase(
val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) as A (s)"
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = List("1,Hi", "1,w", "2,Hello", "2,k", "3,Hello world", "3,x")
val results = result.toDataSet[Row].collect().toList
@@ -508,7 +508,7 @@ class JoinITCase(
" UNNEST(tf.b) as A (x, y) " +
"WHERE x > a"
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = List(
"1,[(12,45.6), (2,45.612)],12,45.6",
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
index b0e6fe8..d965e0c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
@@ -52,7 +52,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
val results = result.toDataSet[Row].collect()
@@ -72,7 +72,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi\n" + "Hello\n" + "Hello world\n"
val results = result.toDataSet[Row].collect()
@@ -94,7 +94,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi\n" + "Hallo\n"
val results = result.toDataSet[Row].collect()
@@ -115,7 +115,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "18"
val results = result.toDataSet[Row].collect()
@@ -135,7 +135,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello\n" + "Hello world\n"
val results = result.toDataSet[Row].collect()
@@ -161,7 +161,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'c)
tEnv.registerDataSet("t2", ds2, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1\n1"
val results = result.toDataSet[Row].collect()
@@ -183,7 +183,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi\n"
val results = result.toDataSet[Row].collect()
@@ -208,7 +208,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hi\n" + "Hello\n"
val results = result.toDataSet[Row].collect()
@@ -234,7 +234,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'c)
tEnv.registerDataSet("t2", ds2, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "1\n2\n2"
val results = result.toDataSet[Row].collect()
@@ -254,7 +254,7 @@ class SetOperatorsITCase(
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello\n" + "Hello world\n"
val results = result.toDataSet[Row].collect()
@@ -271,7 +271,7 @@ class SetOperatorsITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql("SELECT d FROM Table5 WHERE d IN (SELECT a FROM Table3)")
+ val result = tEnv.sqlQuery("SELECT d FROM Table5 WHERE d IN (SELECT a FROM Table3)")
val expected = Seq("1", "2", "2", "3", "3", "3").mkString("\n")
val results = result.toDataSet[Row].collect()
@@ -288,7 +288,7 @@ class SetOperatorsITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- val result = tEnv.sql("SELECT d IN (SELECT a FROM Table3) FROM Table5")
+ val result = tEnv.sqlQuery("SELECT d IN (SELECT a FROM Table3) FROM Table5")
val expected = Seq("false", "false", "false", "false", "false", "false", "false",
"false", "false", "true", "true", "true", "true", "true", "true").mkString("\n")
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
index 4672ec3..66943fc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
@@ -62,7 +62,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
val expected = sortExpectedly(tupleDataSetStrings)
// squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
// the rows need to be copied in object reuse mode
val copied = new mutable.ArrayBuffer[Row]
rows.foreach(r => copied += Row.copy(r))
@@ -99,7 +99,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
// squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
// the rows need to be copied in object reuse mode
val copied = new mutable.ArrayBuffer[Row]
rows.foreach(r => copied += Row.copy(r))
@@ -130,7 +130,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
// squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
// the rows need to be copied in object reuse mode
val copied = new mutable.ArrayBuffer[Row]
rows.foreach(r => copied += Row.copy(r))
@@ -161,7 +161,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
// squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
// the rows need to be copied in object reuse mode
val copied = new mutable.ArrayBuffer[Row]
rows.foreach(r => copied += Row.copy(r))
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
index b7f1bb1..be8278f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
@@ -24,8 +24,10 @@ import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.utils.MemoryTableSinkUtil
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -48,7 +50,7 @@ class TableEnvironmentITCase(
val sqlQuery = "SELECT * FROM MyTable WHERE a > 9"
- val result = tEnv.sql(sqlQuery).select('a.avg, 'b.sum, 'c.count)
+ val result = tEnv.sqlQuery(sqlQuery).select('a.avg, 'b.sum, 'c.count)
val expected = "15,65,12"
val results = result.toDataSet[Row].collect()
@@ -68,7 +70,7 @@ class TableEnvironmentITCase(
val sqlQuery = "SELECT avg(a) as a1, sum(b) as b1, count(c) as c1 FROM MyTable"
- val result = tEnv.sql(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
+ val result = tEnv.sqlQuery(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
val expected = "16,60,12"
val results = result.toDataSet[Row].collect()
@@ -85,11 +87,11 @@ class TableEnvironmentITCase(
tEnv.registerTable("MyTable", t)
val sqlQuery = "SELECT a as aa FROM MyTable WHERE b = 6"
- val result1 = tEnv.sql(sqlQuery)
+ val result1 = tEnv.sqlQuery(sqlQuery)
tEnv.registerTable("ResTable", result1)
val sqlQuery2 = "SELECT count(aa) FROM ResTable"
- val result2 = tEnv.sql(sqlQuery2)
+ val result2 = tEnv.sqlQuery(sqlQuery2)
val expected = "6"
val results = result2.toDataSet[Row].collect()
@@ -106,11 +108,34 @@ class TableEnvironmentITCase(
val ds = env.fromElements(((12, true), "Hello")).toTable(tEnv).as('a1, 'a2)
tEnv.registerTable("MyTable", ds)
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello,true\n"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+
+ @Test
+ def testInsertIntoMemoryTable(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSinkUtil.clear
+
+ val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+ tEnv.sqlUpdate(sql)
+ env.execute()
+
+ val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+ assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
index 504ab90..187d096 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
@@ -44,7 +44,7 @@ class TableSourceITCase(
val tEnv = TableEnvironment.getTableEnvironment(env, config)
tEnv.registerTableSource("csvTable", csvTable)
- val results = tEnv.sql(
+ val results = tEnv.sqlQuery(
"SELECT id, `first`, `last`, score FROM csvTable").collect()
val expected = Seq(
@@ -67,7 +67,7 @@ class TableSourceITCase(
tableEnv.registerTableSource("NestedPersons", nestedTable)
- val result = tableEnv.sql("SELECT NestedPersons.firstName, NestedPersons.lastName," +
+ val result = tableEnv.sqlQuery("SELECT NestedPersons.firstName, NestedPersons.lastName," +
"NestedPersons.address.street, NestedPersons.address.city AS city " +
"FROM NestedPersons " +
"WHERE NestedPersons.address.city LIKE 'Dublin'").collect()
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 116f690..e947c3f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -446,7 +446,7 @@ class CalcITCase(
val sqlQuery = "SELECT c FROM t1 where RichFunc2(c)='ABC#Hello'"
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello"
val results = result.toDataSet[Row].collect()
@@ -467,7 +467,7 @@ class CalcITCase(
val sqlQuery = "SELECT c FROM t1 where RichFunc3(c)=true"
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello"
val results = result.toDataSet[Row].collect()
@@ -488,7 +488,7 @@ class CalcITCase(
val sqlQuery = "SELECT c FROM t1 where " +
"RichFunc2(c)='Abc#Hello' or RichFunc1(a)=3 and b=2"
- val result = tEnv.sql(sqlQuery)
+ val result = tEnv.sqlQuery(sqlQuery)
val expected = "Hello\nHello world"
val results = result.toDataSet[Row].collect()
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
index 2e23161..725c580 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
@@ -26,8 +26,10 @@ import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
+import org.apache.flink.table.utils.MemoryTableSinkUtil
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -162,6 +164,28 @@ class TableEnvironmentITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+ @Test
+ def testInsertIntoMemoryTable(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSinkUtil.clear
+
+ val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f")
+ val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ tEnv.scan("sourceTable")
+ .select('a, 'b, 'c)
+ .insertInto("targetTable")
+ env.execute()
+
+ val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+ assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+ }
}
object TableEnvironmentITCase {
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 24d8695..ec65cf7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -229,7 +229,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
tEnv.registerTable("MyTable", table)
- val t = tEnv.sql("SELECT COUNT(`rowtime`) FROM MyTable " +
+ val t = tEnv.sqlQuery("SELECT COUNT(`rowtime`) FROM MyTable " +
"GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)")
val results = t.toAppendStream[Row]
@@ -292,7 +292,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
tEnv.registerTable("T1", table)
val querySql = "select rowtime as ts, string as msg from T1"
- val results = tEnv.sql(querySql).toAppendStream[Pojo1]
+ val results = tEnv.sqlQuery(querySql).toAppendStream[Pojo1]
results.addSink(new StreamITCase.StringSink[Pojo1])
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index ab7925b..e40da7a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -60,7 +60,7 @@ class JoinITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
tEnv.registerTable("T2", t2)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
}
@@ -97,7 +97,7 @@ class JoinITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
tEnv.registerTable("T2", t2)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
index cc47a69..4884513 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
@@ -68,7 +68,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
"FROM MyTable"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -110,7 +110,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" MIN(c) OVER (" +
" ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
"FROM MyTable"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -153,7 +153,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " +
"from T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -181,7 +181,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
"as cnt1 from T1)"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -215,7 +215,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
"from T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig)
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row](queryConfig)
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -240,7 +240,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
"from T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -302,7 +302,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
" FROM T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -363,7 +363,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
"FROM T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -431,7 +431,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
" FROM T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -492,7 +492,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
"FROM T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -553,7 +553,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -619,7 +619,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -681,7 +681,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -742,7 +742,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -814,7 +814,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
index 2c59f8c..19db2a0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
@@ -87,7 +87,7 @@ class SortITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT b FROM T1 ORDER BY rowtime, b ASC "
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StringRowSelectorSink(0)).setParallelism(1)
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 5398c6d..2c82d9c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -18,16 +18,21 @@
package org.apache.flink.table.runtime.stream.sql
+import java.util
+
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.types.Row
+import org.apache.flink.table.utils.MemoryTableSinkUtil
+
+import scala.collection.JavaConverters._
import org.junit.Assert._
import org.junit._
@@ -58,7 +63,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = ds.toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTableRow", t)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -79,7 +84,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)
- val result = tEnv.sql(sqlQuery).toRetractStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
env.execute()
@@ -100,7 +105,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -121,7 +126,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -142,7 +147,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = StreamTestData.getSmall3TupleDataStream(env)
tEnv.registerDataStream("MyTable", t)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -166,7 +171,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("T2", t2)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -193,7 +198,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("T2", t2)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -219,7 +224,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t2 = StreamTestData.get3TupleDataStream(env)
tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -244,7 +249,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -276,7 +281,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -306,7 +311,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -365,5 +370,33 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
-}
+ @Test
+ def testInsertIntoMemoryTable(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSinkUtil.clear
+ val t = StreamTestData.getSmall3TupleDataStream(env)
+ .assignAscendingTimestamps(x => x._2)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f", "t")
+ val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+ .asInstanceOf[Array[TypeInformation[_]]]
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable"
+ tEnv.sqlUpdate(sql)
+ env.execute()
+
+ val expected = List(
+ "1,1,Hi,1970-01-01 00:00:00.001",
+ "2,2,Hello,1970-01-01 00:00:00.002",
+ "3,2,Hello world,1970-01-01 00:00:00.002")
+ assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index be876a8..30ada56 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -43,7 +43,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
tEnv.registerTableSource("persons", csvTable)
- tEnv.sql(
+ tEnv.sqlQuery(
"SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
.toAppendStream[Row]
.addSink(new StreamITCase.StringSink[Row])
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 830359f..c5b82fe 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.stream.table
import java.io.File
import java.lang.{Boolean => JBool}
-import java.sql.Timestamp
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -37,6 +36,7 @@ import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
import org.apache.flink.table.sinks._
+import org.apache.flink.table.utils.MemoryTableSinkUtil
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
@@ -49,6 +49,36 @@ import scala.collection.JavaConverters._
class TableSinkITCase extends StreamingMultipleProgramsTestBase {
@Test
+ def testInsertIntoRegisteredTableSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSinkUtil.clear
+
+ val input = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(r => r._2)
+ val fieldNames = Array("d", "e", "t")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG)
+ val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val results = input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
+ .where('a < 3 || 'a > 19)
+ .select('c, 't, 'b)
+ .insertInto("targetTable")
+ env.execute()
+
+ val expected = Seq(
+ "Hi,1970-01-01 00:00:00.001,1",
+ "Hello,1970-01-01 00:00:00.002,2",
+ "Comment#14,1970-01-01 00:00:00.006,6",
+ "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
+
+ TestBaseUtils.compareResultAsText(MemoryTableSinkUtil.results.asJava, expected)
+ }
+
+ @Test
def testStreamTableSink(): Unit = {
val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
new file mode 100644
index 0000000..29b2e94
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, TableSinkBase}
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+
+object MemoryTableSinkUtil {
+ var results: mutable.MutableList[String] = mutable.MutableList.empty[String]
+
+ def clear = {
+ MemoryTableSinkUtil.results.clear()
+ }
+
+ final class UnsafeMemoryAppendTableSink
+ extends TableSinkBase[Row] with BatchTableSink[Row]
+ with AppendStreamTableSink[Row] {
+
+ override def getOutputType: TypeInformation[Row] = {
+ new RowTypeInfo(getFieldTypes, getFieldNames)
+ }
+
+ override protected def copy: TableSinkBase[Row] = {
+ new UnsafeMemoryAppendTableSink
+ }
+
+ override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+ dataSet.output(new MemoryCollectionOutputFormat)
+ }
+
+ override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+ dataStream.addSink(new MemoryAppendSink)
+ }
+ }
+
+ private class MemoryAppendSink extends RichSinkFunction[Row]() {
+
+ override def invoke(value: Row): Unit = {
+ results.synchronized {
+ results += value.toString
+ }
+ }
+ }
+
+ private class MemoryCollectionOutputFormat extends RichOutputFormat[Row] {
+
+ override def configure(parameters: Configuration): Unit = {}
+
+ override def open(taskNumber: Int, numTasks: Int): Unit = {}
+
+ override def writeRecord(record: Row): Unit = {
+ results.synchronized {
+ results += record.toString
+ }
+ }
+
+ override def close(): Unit = {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index c4e2433..ff7c79d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.utils
import org.apache.calcite.tools.RuleSet
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment}
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
@@ -40,4 +41,9 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ???
+ override def registerTableSink(
+ name: String,
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]],
+ tableSink: TableSink[_]): Unit = ???
}
[4/4] flink git commit: [FLINK-7357] [table] Fix translation of group
window queries with window props and HAVING.
Posted by fh...@apache.org.
[FLINK-7357] [table] Fix translation of group window queries with window props and HAVING.
This closes #4521.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df5efe9c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df5efe9c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df5efe9c
Branch: refs/heads/master
Commit: df5efe9cead172994abb2fd4858a27caacd9468c
Parents: 73a2443
Author: Rong Rong <ro...@uber.com>
Authored: Thu Aug 10 10:46:25 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 20 10:12:13 2017 +0200
----------------------------------------------------------------------
.../flink/table/plan/rules/FlinkRuleSets.scala | 4 +-
.../common/WindowStartEndPropertiesRule.scala | 169 ++++++++++++-------
.../table/api/batch/sql/GroupWindowTest.scala | 41 +++++
.../table/api/stream/sql/GroupWindowTest.scala | 38 +++++
.../table/runtime/stream/sql/SqlITCase.scala | 51 ++++++
5 files changed, 241 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index a81c7d2..073a8cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -147,7 +147,8 @@ object FlinkRuleSets {
// Transform window to LogicalWindowAggregate
DataSetLogicalWindowAggregateRule.INSTANCE,
- WindowStartEndPropertiesRule.INSTANCE
+ WindowStartEndPropertiesRule.INSTANCE,
+ WindowStartEndPropertiesHavingRule.INSTANCE
)
/**
@@ -179,6 +180,7 @@ object FlinkRuleSets {
// Transform window to LogicalWindowAggregate
DataStreamLogicalWindowAggregateRule.INSTANCE,
WindowStartEndPropertiesRule.INSTANCE,
+ WindowStartEndPropertiesHavingRule.INSTANCE,
// simplify expressions rules
ReduceExpressionsRule.FILTER_INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
index 14e9b21..33190e6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -18,20 +18,19 @@
package org.apache.flink.table.plan.rules.common
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject}
import org.apache.calcite.rex.{RexCall, RexNode}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.expressions.{WindowEnd, WindowStart}
import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
import scala.collection.JavaConversions._
-class WindowStartEndPropertiesRule
- extends RelOptRule(
- WindowStartEndPropertiesRule.WINDOW_EXPRESSION_RULE_PREDICATE,
- "WindowStartEndPropertiesRule") {
+abstract class WindowStartEndPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleName: String)
+ extends RelOptRule(rulePredicate, ruleName) {
override def matches(call: RelOptRuleCall): Boolean = {
val project = call.rel(0).asInstanceOf[LogicalProject]
@@ -49,61 +48,24 @@ class WindowStartEndPropertiesRule
project.getProjects.exists(hasGroupAuxiliaries)
}
- override def onMatch(call: RelOptRuleCall): Unit = {
-
- val project = call.rel(0).asInstanceOf[LogicalProject]
- val innerProject = call.rel(1).asInstanceOf[LogicalProject]
- val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
-
- // Retrieve window start and end properties
- val transformed = call.builder()
- val rexBuilder = transformed.getRexBuilder
- transformed.push(LogicalWindowAggregate.create(
- agg.getWindow,
- Seq(
- NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
- NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))
- ), agg)
- )
-
- // forward window start and end properties
- transformed.project(
- innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end")))
-
- def replaceGroupAuxiliaries(node: RexNode): RexNode = {
- node match {
- case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
- // replace expression by access to window start
- rexBuilder.makeCast(c.getType, transformed.field("w$start"), false)
- case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
- // replace expression by access to window end
- rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
- case c: RexCall =>
- // replace expressions in children
- val newOps = c.getOperands.map(replaceGroupAuxiliaries)
- c.clone(c.getType, newOps)
- case x =>
- // preserve expression
- x
- }
+ def replaceGroupAuxiliaries(node: RexNode, relBuilder: RelBuilder): RexNode = {
+ val rexBuilder = relBuilder.getRexBuilder
+ node match {
+ case c: RexCall if isWindowStart(c) =>
+ // replace expression by access to window start
+ rexBuilder.makeCast(c.getType, relBuilder.field("w$start"), false)
+ case c: RexCall if isWindowEnd(c) =>
+ // replace expression by access to window end
+ rexBuilder.makeCast(c.getType, relBuilder.field("w$end"), false)
+ case c: RexCall =>
+ // replace expressions in children
+ val newOps = c.getOperands.map(x => replaceGroupAuxiliaries(x, relBuilder))
+ c.clone(c.getType, newOps)
+ case x =>
+ // preserve expression
+ x
}
-
- // replace window auxiliary function by access to window properties
- transformed.project(
- project.getProjects.map(replaceGroupAuxiliaries)
- )
- val res = transformed.build()
- call.transformTo(res)
}
-}
-
-object WindowStartEndPropertiesRule {
- private val WINDOW_EXPRESSION_RULE_PREDICATE =
- RelOptRule.operand(classOf[LogicalProject],
- RelOptRule.operand(classOf[LogicalProject],
- RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))
-
- val INSTANCE = new WindowStartEndPropertiesRule
/** Checks if a RexNode is a window start auxiliary function. */
private def isWindowStart(node: RexNode): Boolean = {
@@ -113,7 +75,7 @@ object WindowStartEndPropertiesRule {
case SqlStdOperatorTable.TUMBLE_START |
SqlStdOperatorTable.HOP_START |
SqlStdOperatorTable.SESSION_START
- => true
+ => true
case _ => false
}
case _ => false
@@ -128,10 +90,95 @@ object WindowStartEndPropertiesRule {
case SqlStdOperatorTable.TUMBLE_END |
SqlStdOperatorTable.HOP_END |
SqlStdOperatorTable.SESSION_END
- => true
+ => true
case _ => false
}
case _ => false
}
}
}
+
+object WindowStartEndPropertiesRule {
+
+ val INSTANCE = new WindowStartEndPropertiesBaseRule(
+ RelOptRule.operand(classOf[LogicalProject],
+ RelOptRule.operand(classOf[LogicalProject],
+ RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none()))),
+ "WindowStartEndPropertiesRule") {
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+
+ val project = call.rel(0).asInstanceOf[LogicalProject]
+ val innerProject = call.rel(1).asInstanceOf[LogicalProject]
+ val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
+
+ // Retrieve window start and end properties
+ val builder = call.builder()
+ builder.push(LogicalWindowAggregate.create(
+ agg.getWindow,
+ Seq(
+ NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
+ NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))),
+ agg)
+ )
+
+ // forward window start and end properties
+ builder.project(
+ innerProject.getProjects ++ Seq(builder.field("w$start"), builder.field("w$end")))
+
+ // replace window auxiliary function by access to window properties
+ builder.project(
+ project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder))
+ )
+ val res = builder.build()
+ call.transformTo(res)
+ }
+ }
+}
+
+object WindowStartEndPropertiesHavingRule {
+
+ val INSTANCE = new WindowStartEndPropertiesBaseRule(
+ RelOptRule.operand(classOf[LogicalProject],
+ RelOptRule.operand(classOf[LogicalFilter],
+ RelOptRule.operand(classOf[LogicalProject],
+ RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))),
+ "WindowStartEndPropertiesHavingRule") {
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+
+ val project = call.rel(0).asInstanceOf[LogicalProject]
+ val filter = call.rel(1).asInstanceOf[LogicalFilter]
+ val innerProject = call.rel(2).asInstanceOf[LogicalProject]
+ val agg = call.rel(3).asInstanceOf[LogicalWindowAggregate]
+
+ // Retrieve window start and end properties
+ val builder = call.builder()
+ builder.push(LogicalWindowAggregate.create(
+ agg.getWindow,
+ Seq(
+ NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
+ NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))),
+ agg)
+ )
+
+ // forward window start and end properties
+ builder.project(
+ innerProject.getProjects ++ Seq(builder.field("w$start"), builder.field("w$end")))
+
+ // replace window auxiliary function by access to window properties
+ builder.filter(
+ filter.getChildExps.map(expr => replaceGroupAuxiliaries(expr, builder))
+ )
+
+ // replace window auxiliary function by access to window properties
+ builder.project(
+ project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder))
+ )
+
+ val res = builder.build()
+ call.transformTo(res)
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
index e77087c..a78aa8c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
@@ -256,4 +256,45 @@ class GroupWindowTest extends TableTestBase {
util.verifySql(sqlQuery, expected)
}
+
+ @Test
+ def testExpressionOnWindowHavingFunction() = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+ val sql =
+ "SELECT " +
+ " COUNT(*), " +
+ " HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+ "FROM T " +
+ "GROUP BY HOP(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+ "HAVING " +
+ " SUM(a) > 0 AND " +
+ " QUARTER(HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetWindowAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "ts, a")
+ ),
+ term("window", SlidingGroupWindow('w$, 'ts, 60000.millis, 900000.millis)),
+ term("select",
+ "COUNT(*) AS EXPR$0",
+ "SUM(a) AS $f1",
+ "start('w$) AS w$start",
+ "end('w$) AS w$end")
+ ),
+ term("select", "EXPR$0", "CAST(w$start) AS w$start"),
+ term("where",
+ "AND(>($f1, 0), " +
+ "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(CAST(w$start)), 86400000)), 1))")
+ )
+
+ util.verifySql(sql, expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
index 4823903..722c4f0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
@@ -143,4 +143,42 @@ class GroupWindowTest extends TableTestBase {
streamUtil.verifySql(sql, expected)
}
+
+ @Test
+ def testExpressionOnWindowHavingFunction() = {
+ val sql =
+ "SELECT " +
+ " COUNT(*), " +
+ " HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+ "FROM MyTable " +
+ "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+ "HAVING " +
+ " SUM(a) > 0 AND " +
+ " QUARTER(HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "rowtime, a")
+ ),
+ term("window", SlidingGroupWindow('w$, 'rowtime, 60000.millis, 900000.millis)),
+ term("select",
+ "COUNT(*) AS EXPR$0",
+ "SUM(a) AS $f1",
+ "start('w$) AS w$start",
+ "end('w$) AS w$end")
+ ),
+ term("select", "EXPR$0", "w$start"),
+ term("where",
+ "AND(>($f1, 0), " +
+ "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(w$start), 86400000)), 1))")
+ )
+
+ streamUtil.verifySql(sql, expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index d2f9a9a..5398c6d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -21,9 +21,11 @@ package org.apache.flink.table.runtime.stream.sql
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.types.Row
import org.junit.Assert._
@@ -314,5 +316,54 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testHopStartEndWithHaving(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+
+ val sqlQueryHopStartEndWithHaving =
+ """
+ |SELECT
+ | c AS k,
+ | COUNT(a) AS v,
+ | HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowStart,
+ | HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd
+ |FROM T1
+ |GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), c
+ |HAVING
+ | SUM(b) > 1 AND
+ | QUARTER(HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)) = 1
+ """.stripMargin
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (3, 1L, "Hello")),
+ Right(14000010L),
+ Left(8640000000L, (4, 1L, "Hello")), // data for the quarter to validate having filter
+ Left(8640000001L, (4, 1L, "Hello")),
+ Right(8640000010L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val resultHopStartEndWithHaving = tEnv.sql(sqlQueryHopStartEndWithHaving).toAppendStream[Row]
+ resultHopStartEndWithHaving.addSink(new StreamITCase.StringSink[Row])
+
+ env.execute()
+
+ val expected = List(
+ "Hello,2,1970-01-01 03:53:00.0,1970-01-01 03:54:00.0"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
}