You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2016/04/21 15:34:51 UTC
[1/2] flink git commit: [FLINK-3727] Add support for embedded
streaming SQL (projection, filter, union)
Repository: flink
Updated Branches:
refs/heads/master 7478cba17 -> 4b06b01ea
[FLINK-3727] Add support for embedded streaming SQL (projection, filter, union)
- add methods to register DataStreams
- add sql translation method in StreamTableEnvironment
- add a custom rule to convert to streamable table
- add docs for streaming table and sql
This closes #1917
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51ff4a08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51ff4a08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51ff4a08
Branch: refs/heads/master
Commit: 51ff4a0867179d8b0e6c4dc07df46f871063b511
Parents: 7478cba
Author: vasia <va...@apache.org>
Authored: Fri Apr 15 13:35:24 2016 +0200
Committer: vasia <va...@apache.org>
Committed: Thu Apr 21 14:42:58 2016 +0200
----------------------------------------------------------------------
docs/apis/batch/libs/table.md | 41 ++++-
.../api/java/table/StreamTableEnvironment.scala | 48 ++++-
.../scala/table/StreamTableEnvironment.scala | 45 ++++-
.../api/table/StreamTableEnvironment.scala | 44 ++++-
.../flink/api/table/TableEnvironment.scala | 30 +++-
.../rules/EnumerableToLogicalTableScan.scala | 1 -
.../api/table/plan/rules/FlinkRuleSets.scala | 58 ++++---
.../plan/rules/LogicalScanToStreamable.scala | 56 ++++++
.../api/table/plan/schema/DataStreamTable.scala | 1 -
.../table/plan/schema/TransStreamTable.scala | 78 +++++++++
.../flink/api/java/sql/test/BatchSQLITCase.java | 121 +++++++++++++
.../api/java/sql/test/StreamingSQLITCase.java | 121 +++++++++++++
.../java/table/test/utils/StreamTestData.java | 64 +++++++
.../sql/streaming/test/StreamSQLITCase.scala | 173 +++++++++++++++++++
.../table/streaming/test/FilterITCase.scala | 2 -
.../table/streaming/test/SelectITCase.scala | 2 +-
.../streaming/test/utils/StreamITCase.scala | 18 +-
.../streaming/test/utils/StreamTestData.scala | 4 -
18 files changed, 846 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/docs/apis/batch/libs/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/table.md b/docs/apis/batch/libs/table.md
index 5b1280c..7b41cd3 100644
--- a/docs/apis/batch/libs/table.md
+++ b/docs/apis/batch/libs/table.md
@@ -59,16 +59,18 @@ Note that the Table API is currently not part of the binary distribution. See li
Table API
----------
-The Table API provides methods to apply relational operations on DataSets, both in Scala and Java.
+The Table API provides methods to apply relational operations on DataSets and Datastreams both in Scala and Java.
-The central concept of the Table API is a `Table` which represents a table with relational schema (or relation). Tables can be created from a `DataSet`, converted into a `DataSet`, or registered in a table catalog using a `TableEnvironment`. A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine Tables of different TableEnvironments.
+The central concept of the Table API is a `Table` which represents a table with relational schema (or relation). Tables can be created from a `DataSet` or `DataStream`, converted into a `DataSet` or `DataStream`, or registered in a table catalog using a `TableEnvironment`. A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine Tables of different TableEnvironments.
-The following sections show by example how to use the Table API embedded in the Scala and Java DataSet APIs.
+*Note that the only operations currently supported on streaming Tables are selection, filtering, and union.*
+
+The following sections show by example how to use the Table API embedded in the Scala and Java DataSet APIs.
### Scala Table API
The Table API is enabled by importing `org.apache.flink.api.scala.table._`. This enables
-implicit conversions to convert a DataSet to a Table. The following example shows:
+implicit conversions to convert a `DataSet` or `DataStream` to a Table. The following example shows:
- how a `DataSet` is converted to a `Table`,
- how relational queries are specified, and
@@ -112,6 +114,23 @@ val joined = input1.join(input2)
Notice, how the field names of a Table can be changed with `as()` or specified with `toTable()` when converting a DataSet to a Table. In addition, the example shows how to use Strings to specify relational expressions.
+Creating a `Table` from a `DataStream` works in a similar way.
+The following example shows how to convert a `DataStream` to a `Table` and filter it with the Table API.
+
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val inputStream = env.addSource(...)
+val result = inputStream
+ .toTable(tEnv, 'a, 'b, 'c)
+ .filter('a === 3)
+val resultStream = result.toDataStream[Row]
+{% endhighlight %}
+
Please refer to the Scaladoc (and Javadoc) for a full list of supported operations and a description of the expression syntax.
{% top %}
@@ -419,8 +438,8 @@ Only the types `LONG` and `STRING` can be casted to `DATE` and vice versa. A `LO
SQL
----
The Table API also supports embedded SQL queries.
-In order to use a `Table` or `DataSet` in a SQL query, it has to be registered in the `TableEnvironment`, using a unique name.
-A registered `Table` can be retrieved back from the `TableEnvironment` using the `scan()` method:
+In order to use a `Table`, `DataSet`, or `DataStream` in a SQL query, it has to be registered in the `TableEnvironment`, using a unique name.
+A registered Dataset `Table` can be retrieved back from the `TableEnvironment` using the `scan()` method and a registered DataStream `Table` can be retrieved using the `ingest()` method.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -452,9 +471,11 @@ val t = tableEnv.scan("MyTable")
</div>
</div>
-*Note: Table names are not allowed to follow the `^_DataSetTable_[0-9]+` pattern, as this is reserved for internal use only.*
+A DataStream `Table` can be registered in the `StreamTableEnvironment` using the correponding `registerDataStream` method.
+
+*Note: DataSet Table names are not allowed to follow the `^_DataSetTable_[0-9]+` pattern and DataStream Tables are not allowed to follow the `^_DataStreamTable_[0-9]+` pattern, as these are reserved for internal use only.*
-When registering a `DataSet`, one can also specify the field names of the table:
+When registering a `DataSet` or `DataStream`, one can also specify the field names of the table:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -492,7 +513,7 @@ tableEnv.registerTable("Orders", t)
</div>
</div>
-Registered tables can be used in SQL queries. A SQL query is defined using the `sql()` method of the `TableEnvironment`. It returns a new `Table` which can be converted back to a `DataSet` or used in subsequent Table API queries.
+Registered tables can be used in SQL queries. A SQL query is defined using the `sql()` method of the `TableEnvironment`. It returns a new `Table` which can be converted back to a `DataSet`, or `DataStream`, or used in subsequent Table API queries.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -524,6 +545,8 @@ val result = tableEnv.sql("SELECT SUM(amount) FROM Orders WHERE product = 10")
</div>
</div>
+SQL queries can be executed on DataStream Tables by adding the `STREAM` SQL keyword before the table name. Please refer to the [Apache Calcite SQL Streaming documentation](https://calcite.apache.org/docs/stream.html) for more information on the Streaming SQL syntax.
+
{% top %}
Runtime Configuration
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
index 980e45b..2faf13d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
@@ -58,7 +58,7 @@ class StreamTableEnvironment(
def fromDataStream[T](dataStream: DataStream[T]): Table = {
val name = createUniqueTableName()
- registerDataStreamInternal(name, dataStream)
+ registerDataStreamInternal(name, dataStream, false)
ingest(name)
}
@@ -83,11 +83,55 @@ class StreamTableEnvironment(
.toArray
val name = createUniqueTableName()
- registerDataStreamInternal(name, dataStream, exprs)
+ registerDataStreamInternal(name, dataStream, exprs, false)
ingest(name)
}
/**
+ * Registers the given [[DataStream]] as table in the
+ * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * The field names of the [[Table]] are automatically derived
+ * from the type of the [[DataStream]].
+ *
+ * @param name The name under which the [[DataStream]] is registered in the catalog.
+ * @param dataStream The [[DataStream]] to register.
+ * @tparam T The type of the [[DataStream]] to register.
+ */
+ def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
+
+ checkValidTableName(name)
+ registerDataStreamInternal(name, dataStream, true)
+ }
+
+ /**
+ * Registers the given [[DataStream]] as table with specified field names in the
+ * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * Example:
+ *
+ * {{{
+ * DataStream<Tuple2<String, Long>> set = ...
+ * tableEnv.registerDataStream("myTable", set, "a, b")
+ * }}}
+ *
+ * @param name The name under which the [[DataStream]] is registered in the catalog.
+ * @param dataStream The [[DataStream]] to register.
+ * @param fields The field names of the registered table.
+ * @tparam T The type of the [[DataStream]] to register.
+ */
+ def registerDataStream[T](name: String, dataStream: DataStream[T], fields: String): Unit = {
+ val exprs = ExpressionParser
+ .parseExpressionList(fields)
+ .toArray
+
+ checkValidTableName(name)
+ registerDataStreamInternal(name, dataStream, exprs, true)
+ }
+
+ /**
* Converts the given [[Table]] into a [[DataStream]] of a specified type.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
index 48de953..1bbfaaa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
@@ -19,7 +19,6 @@ package org.apache.flink.api.scala.table
import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.apache.flink.api.table.{Row, TableConfig, Table}
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
@@ -60,7 +59,7 @@ class StreamTableEnvironment(
def fromDataStream[T](dataStream: DataStream[T]): Table = {
val name = createUniqueTableName()
- registerDataStreamInternal(name, dataStream.javaStream)
+ registerDataStreamInternal(name, dataStream.javaStream, false)
ingest(name)
}
@@ -82,11 +81,51 @@ class StreamTableEnvironment(
def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = {
val name = createUniqueTableName()
- registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
+ registerDataStreamInternal(name, dataStream.javaStream, fields.toArray, false)
ingest(name)
}
/**
+ * Registers the given [[DataStream]] as table in the
+ * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * The field names of the [[Table]] are automatically derived from the type of the [[DataStream]].
+ *
+ * @param name The name under which the [[DataStream]] is registered in the catalog.
+ * @param dataStream The [[DataStream]] to register.
+ * @tparam T The type of the [[DataStream]] to register.
+ */
+ def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
+
+ checkValidTableName(name)
+ registerDataStreamInternal(name, dataStream.javaStream, true)
+ }
+
+ /**
+ * Registers the given [[DataStream]] as table with specified field names in the
+ * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * Example:
+ *
+ * {{{
+ * val set: DataStream[(String, Long)] = ...
+ * tableEnv.registerDataStream("myTable", set, 'a, 'b)
+ * }}}
+ *
+ * @param name The name under which the [[DataStream]] is registered in the catalog.
+ * @param dataStream The [[DataStream]] to register.
+ * @param fields The field names of the registered table.
+ * @tparam T The type of the [[DataStream]] to register.
+ */
+ def registerDataStream[T](name: String, dataStream: DataStream[T], fields: Expression*): Unit = {
+
+ checkValidTableName(name)
+ registerDataStreamInternal(name, dataStream.javaStream, fields.toArray, true)
+ }
+
+ /**
* Converts the given [[Table]] into a [[DataStream]] of a specified type.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index 8724b5a..48f33f8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamRel, DataStreamConvention}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.plan.schema.DataStreamTable
+import org.apache.flink.api.table.plan.schema.{TransStreamTable, DataStreamTable}
import org.apache.flink.streaming.api.datastream.DataStream
/**
@@ -103,7 +103,15 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
*/
override def sql(query: String): Table = {
- ???
+ val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner)
+ // parse the sql query
+ val parsed = planner.parse(query)
+ // validate the sql query
+ val validated = planner.validate(parsed)
+ // transform to a relational tree
+ val relational = planner.rel(validated)
+
+ new Table(relational.rel, this)
}
/**
@@ -112,11 +120,14 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
*
* @param name The name under which the table is registered in the catalog.
* @param dataStream The [[DataStream]] to register as table in the catalog.
+ * @param wrapper True if the registration has to wrap the datastreamTable
+ * into a [[org.apache.calcite.schema.StreamableTable]]
* @tparam T the type of the [[DataStream]].
*/
protected def registerDataStreamInternal[T](
name: String,
- dataStream: DataStream[T]): Unit = {
+ dataStream: DataStream[T],
+ wrapper: Boolean): Unit = {
val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType)
val dataStreamTable = new DataStreamTable[T](
@@ -124,7 +135,16 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
fieldIndexes,
fieldNames
)
- registerTableInternal(name, dataStreamTable)
+ // when registering a DataStream, we need to wrap it into a StreamableTable
+ // so that the SQL validation phase won't fail
+ if (wrapper) {
+ registerTableInternal(name, dataStreamTable)
+ val t = ingest(name)
+ replaceRegisteredTable(name, new TransStreamTable(t.getRelNode, true))
+ }
+ else {
+ registerTableInternal(name, dataStreamTable)
+ }
}
/**
@@ -134,12 +154,15 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
* @param name The name under which the table is registered in the catalog.
* @param dataStream The [[DataStream]] to register as table in the catalog.
* @param fields The field expressions to define the field names of the table.
+ * @param wrapper True if the registration has to wrap the datastreamTable
+ * into a [[org.apache.calcite.schema.StreamableTable]]
* @tparam T The type of the [[DataStream]].
*/
protected def registerDataStreamInternal[T](
name: String,
dataStream: DataStream[T],
- fields: Array[Expression]): Unit = {
+ fields: Array[Expression],
+ wrapper: Boolean): Unit = {
val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields.toArray)
val dataStreamTable = new DataStreamTable[T](
@@ -147,7 +170,16 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
fieldIndexes.toArray,
fieldNames.toArray
)
- registerTableInternal(name, dataStreamTable)
+ // when registering a DataStream, we need to wrap it into a StreamableTable
+ // so that the SQL validation phase won't fail
+ if (wrapper) {
+ registerTableInternal(name, dataStreamTable)
+ val t = ingest(name)
+ replaceRegisteredTable(name, new TransStreamTable(t.getRelNode, true))
+ }
+ else {
+ registerTableInternal(name, dataStreamTable)
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index 77830ca..2c166b7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -37,7 +37,7 @@ import org.apache.flink.api.scala.table.{StreamTableEnvironment => ScalaStreamTa
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.table.expressions.{Naming, UnresolvedFieldReference, Expression}
import org.apache.flink.api.table.plan.cost.DataSetCostFactory
-import org.apache.flink.api.table.plan.schema.TableTable
+import org.apache.flink.api.table.plan.schema.{TransStreamTable, TableTable}
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
@@ -98,8 +98,32 @@ abstract class TableEnvironment(val config: TableConfig) {
checkValidTableName(name)
- val tableTable = new TableTable(table.getRelNode)
- registerTableInternal(name, tableTable)
+ table.tableEnv match {
+ case e: BatchTableEnvironment =>
+ val tableTable = new TableTable(table.getRelNode)
+ registerTableInternal(name, tableTable)
+ case e: StreamTableEnvironment =>
+ val sTableTable = new TransStreamTable(table.getRelNode, true)
+ tables.add(name, sTableTable)
+ }
+
+ }
+
+ /**
+ * Replaces a registered Table with another Table under the same name.
+ * We use this method to replace a [[org.apache.flink.api.table.plan.schema.DataStreamTable]]
+ * with a [[org.apache.calcite.schema.TranslatableTable]].
+ *
+ * @param name
+ * @param table
+ */
+ protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = {
+
+ if (isRegistered(name)) {
+ tables.add(name, table)
+ } else {
+ throw new TableException(s"Table \'$name\' is not registered.")
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
index 02d2159..ee515c9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
@@ -18,7 +18,6 @@
package org.apache.flink.api.table.plan.rules
-
import org.apache.calcite.plan.RelOptRule.{any, operand}
import org.apache.calcite.adapter.enumerable.EnumerableTableScan
import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand}
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index e9bcaa2..705bd83 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -19,9 +19,12 @@
package org.apache.flink.api.table.plan.rules
import org.apache.calcite.rel.rules._
+import org.apache.calcite.rel.stream.StreamRules
import org.apache.calcite.tools.{RuleSets, RuleSet}
import org.apache.flink.api.table.plan.rules.dataSet._
import org.apache.flink.api.table.plan.rules.datastream._
+import org.apache.flink.api.table.plan.rules.datastream.{DataStreamCalcRule, DataStreamScanRule, DataStreamUnionRule}
+import scala.collection.JavaConversions._
object FlinkRuleSets {
@@ -105,36 +108,43 @@ object FlinkRuleSets {
/**
* RuleSet to optimize plans for batch / DataSet execution
*/
- val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
+ val DATASTREAM_OPT_RULES: RuleSet = {
- // calc rules
- FilterToCalcRule.INSTANCE,
- ProjectToCalcRule.INSTANCE,
- FilterCalcMergeRule.INSTANCE,
- ProjectCalcMergeRule.INSTANCE,
- CalcMergeRule.INSTANCE,
+ val rules = List(
- // prune empty results rules
- PruneEmptyRules.FILTER_INSTANCE,
- PruneEmptyRules.PROJECT_INSTANCE,
- PruneEmptyRules.UNION_INSTANCE,
+ EnumerableToLogicalTableScan.INSTANCE,
+ LogicalScanToStreamable.INSTANCE,
- // simplify expressions rules
- ReduceExpressionsRule.CALC_INSTANCE,
+ // calc rules
+ FilterToCalcRule.INSTANCE,
+ ProjectToCalcRule.INSTANCE,
+ FilterCalcMergeRule.INSTANCE,
+ ProjectCalcMergeRule.INSTANCE,
+ CalcMergeRule.INSTANCE,
- // push and merge projection rules
- ProjectFilterTransposeRule.INSTANCE,
- FilterProjectTransposeRule.INSTANCE,
- ProjectRemoveRule.INSTANCE,
+ // prune empty results rules
+ PruneEmptyRules.FILTER_INSTANCE,
+ PruneEmptyRules.PROJECT_INSTANCE,
+ PruneEmptyRules.UNION_INSTANCE,
- // merge and push unions rules
- UnionEliminatorRule.INSTANCE,
+ // push and merge projection rules
+ ProjectFilterTransposeRule.INSTANCE,
+ FilterProjectTransposeRule.INSTANCE,
+ ProjectRemoveRule.INSTANCE,
+
+ // simplify expressions rules
+ ReduceExpressionsRule.CALC_INSTANCE,
+
+ // merge and push unions rules
+ UnionEliminatorRule.INSTANCE,
- // translate to DataStream nodes
- DataStreamCalcRule.INSTANCE,
- DataStreamScanRule.INSTANCE,
- DataStreamUnionRule.INSTANCE,
- DataStreamValuesRule.INSTANCE
+ // translate to DataStream nodes
+ DataStreamCalcRule.INSTANCE,
+ DataStreamScanRule.INSTANCE,
+ DataStreamUnionRule.INSTANCE,
+ DataStreamValuesRule.INSTANCE
)
+ RuleSets.ofList(rules ++ StreamRules.RULES.asList.take(7))
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/LogicalScanToStreamable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/LogicalScanToStreamable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/LogicalScanToStreamable.scala
new file mode 100644
index 0000000..3b389bc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/LogicalScanToStreamable.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand}
+import org.apache.calcite.prepare.RelOptTableImpl
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.schema.StreamableTable
+import org.apache.flink.api.table.plan.schema.TransStreamTable
+
+/**
+ * Custom rule that converts a LogicalScan into another LogicalScan
+ * whose internal Table is [[StreamableTable]] and [[org.apache.calcite.schema.TranslatableTable]].
+ */
+class LogicalScanToStreamable(
+ operand: RelOptRuleOperand,
+ description: String) extends RelOptRule(operand, description) {
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val oldRel = call.rel(0).asInstanceOf[LogicalTableScan]
+ val table = oldRel.getTable
+ table.unwrap(classOf[StreamableTable]) match {
+ case s: StreamableTable =>
+ // already a StreamableTable => do nothing
+ case _ => // convert to a StreamableTable
+ val sTable = new TransStreamTable(oldRel, false)
+ val newRel = LogicalTableScan.create(oldRel.getCluster,
+ RelOptTableImpl.create(table.getRelOptSchema, table.getRowType, sTable))
+ call.transformTo(newRel)
+ }
+ }
+}
+
+object LogicalScanToStreamable {
+ val INSTANCE = new LogicalScanToStreamable(
+ operand(classOf[LogicalTableScan], any),
+ "LogicalScanToStreamable")
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
index 1523f93..ffc2692 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
@@ -33,5 +33,4 @@ class DataStreamTable[T](
.foreach( f => builder.add(f._1, f._2).nullable(true) )
builder.build
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
new file mode 100644
index 0000000..bc27659
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Schema.TableType
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.{StreamableTable, Table, TranslatableTable}
+
+/**
+ * A [[org.apache.calcite.schema.Table]] implementation for registering
+ * Streaming Table API Tables in the Calcite schema to be used by Flink SQL.
+ * It implements [[TranslatableTable]] so that its logical scan
+ * can be converted to a relational expression and [[StreamableTable]]
+ * so that it can be used in Streaming SQL queries.
+ *
+ * Except for registering Streaming Tables, this implementation is also used
+ * in [[org.apache.flink.api.table.plan.rules.LogicalScanToStreamable]]
+ * rule to convert a logical scan of a non-Streamable Table into
+ * a logical scan of a Streamable table, i.e. of this class.
+ *
+ * @see [[DataStreamTable]]
+ */
+class TransStreamTable(relNode: RelNode, wrapper: Boolean)
+ extends AbstractTable
+ with TranslatableTable
+ with StreamableTable {
+
+ override def getJdbcTableType: TableType = ???
+
+ override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType
+
+ override def stream(): Table = {
+ if (wrapper) {
+ // we need to return a wrapper non-streamable table,
+ // otherwise Calcite's rule-matching produces an infinite loop
+ new StreamTable(relNode)
+ }
+ else {
+ this
+ }
+ }
+
+ override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode =
+ relNode
+
+ /**
+ * Wraps a [[TransStreamTable]]'s relNode
+ * to implement its stream() method.
+ */
+ class StreamTable(relNode: RelNode) extends AbstractTable {
+
+ override def getJdbcTableType: TableType = ???
+
+ override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+ relNode.getRowType
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java
new file mode 100644
index 0000000..31a5d53
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sql.test;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class BatchSQLITCase extends TableProgramsTestBase {
+
+ public BatchSQLITCase(TestExecutionMode mode, TableConfigMode configMode) {
+ super(mode, configMode);
+ }
+
+ @Test
+ public void testSelectFromTable() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table in = tableEnv.fromDataSet(ds, "a,b,c");
+ tableEnv.registerTable("T", in);
+
+ String sqlQuery = "SELECT a, c FROM T";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "1,Hi\n" + "2,Hello\n" + "3,Hello world\n" +
+ "4,Hello world, how are you?\n" + "5,I am fine.\n" + "6,Luke Skywalker\n" +
+ "7,Comment#1\n" + "8,Comment#2\n" + "9,Comment#3\n" + "10,Comment#4\n" +
+ "11,Comment#5\n" + "12,Comment#6\n" + "13,Comment#7\n" +
+ "14,Comment#8\n" + "15,Comment#9\n" + "16,Comment#10\n" +
+ "17,Comment#11\n" + "18,Comment#12\n" + "19,Comment#13\n" +
+ "20,Comment#14\n" + "21,Comment#15\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testFilterFromDataSet() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ tableEnv.registerDataSet("DataSetTable", ds, "x, y, z");
+
+ String sqlQuery = "SELECT x FROM DataSetTable WHERE z LIKE '%Hello%'";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "2\n" + "3\n" + "4";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAggregation() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ tableEnv.registerDataSet("AggTable", ds, "x, y, z");
+
+ String sqlQuery = "SELECT sum(x), min(x), max(x), count(y), avg(x) FROM AggTable";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "231,1,21,21,11";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testJoin() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ tableEnv.registerDataSet("t1", ds1, "a, b, c");
+ tableEnv.registerDataSet("t2",ds2, "d, e, f, g, h");
+
+ String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
+ compareResultAsText(results, expected);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java
new file mode 100644
index 0000000..8e420f2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sql.test;
+
+import org.apache.flink.api.java.table.StreamTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.api.java.table.test.utils.StreamTestData;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamingSQLITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ public void testSelect() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ StreamITCase.clear();
+
+ DataStream<Tuple3<Integer, Long, String>> ds = StreamTestData.getSmall3TupleDataSet(env);
+ Table in = tableEnv.fromDataStream(ds, "a,b,c");
+ tableEnv.registerTable("MyTable", in);
+
+ String sqlQuery = "SELECT STREAM * FROM MyTable";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink());
+ env.execute();
+
+ List<String> expected = new ArrayList();
+ expected.add("1,1,Hi");
+ expected.add("2,2,Hello");
+ expected.add("3,2,Hello world");
+
+ StreamITCase.compareWithList(expected);
+ }
+
+ @Test
+ public void testFilter() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ StreamITCase.clear();
+
+ DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env);
+ tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
+
+ String sqlQuery = "SELECT STREAM a, b, e FROM MyTable WHERE c < 4";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink());
+ env.execute();
+
+ List<String> expected = new ArrayList();
+ expected.add("1,1,1");
+ expected.add("2,2,2");
+ expected.add("2,3,1");
+ expected.add("3,4,2");
+
+ StreamITCase.compareWithList(expected);
+ }
+
+ @Test
+ public void testUnion() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ StreamITCase.clear();
+
+ DataStream<Tuple3<Integer, Long, String>> ds1 = StreamTestData.getSmall3TupleDataSet(env);
+ Table t1 = tableEnv.fromDataStream(ds1, "a,b,c");
+ tableEnv.registerTable("T1", t1);
+
+ DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds2 = StreamTestData.get5TupleDataStream(env);
+ tableEnv.registerDataStream("T2", ds2, "a, b, d, c, e");
+
+ String sqlQuery = "SELECT STREAM * FROM T1 " +
+ "UNION ALL " +
+ "(SELECT STREAM a, b, c FROM T2 WHERE a < 3)";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink());
+ env.execute();
+
+ List<String> expected = new ArrayList();
+ expected.add("1,1,Hi");
+ expected.add("2,2,Hello");
+ expected.add("3,2,Hello world");
+ expected.add("1,1,Hallo");
+ expected.add("2,2,Hallo Welt");
+ expected.add("2,3,Hallo Welt wie");
+
+ StreamITCase.compareWithList(expected);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/utils/StreamTestData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/utils/StreamTestData.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/utils/StreamTestData.java
new file mode 100644
index 0000000..dc3a8dc
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/utils/StreamTestData.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test.utils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class StreamTestData {
+
+ public static DataStream<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(StreamExecutionEnvironment env) {
+
+ List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
+ data.add(new Tuple3<>(1, 1L, "Hi"));
+ data.add(new Tuple3<>(2, 2L, "Hello"));
+ data.add(new Tuple3<>(3, 2L, "Hello world"));
+
+ Collections.shuffle(data);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataStream<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataStream(StreamExecutionEnvironment env) {
+
+ List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
+ data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
+ data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
+ data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
+ data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
+ data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
+ data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
+ data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
+ data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
+ data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
+ data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
+ data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
+ data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
+ data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
+ data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
+ data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
+ return env.fromCollection(data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/streaming/test/StreamSQLITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/streaming/test/StreamSQLITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/streaming/test/StreamSQLITCase.scala
new file mode 100644
index 0000000..573a628
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/streaming/test/StreamSQLITCase.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.streaming.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.table.streaming.test.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class StreamSQLITCase extends StreamingMultipleProgramsTestBase {
+
+ /** test selection **/
+ @Test
+ def testSelectExpressionFromTable(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT STREAM a * 2, b - 1 FROM MyTable"
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("2,0", "4,1", "6,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test filtering with registered table **/
+ @Test
+ def testSimpleFilter(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT STREAM * FROM MyTable WHERE a = 3"
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test filtering with registered datastream **/
+ @Test
+ def testDatastreamFilter(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT STREAM * FROM MyTable WHERE _1 = 3"
+
+ val t = StreamTestData.getSmall3TupleDataStream(env)
+ tEnv.registerDataStream("MyTable", t)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test union with registered tables **/
+ @Test
+ def testUnion(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT STREAM * FROM T1 " +
+ "UNION ALL " +
+ "SELECT STREAM * FROM T2"
+
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T1", t1)
+ val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1,Hi", "1,1,Hi",
+ "2,2,Hello", "2,2,Hello",
+ "3,2,Hello world", "3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test union with filter **/
+ @Test
+ def testUnionWithFilter(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT STREAM * FROM T1 WHERE a = 3 " +
+ "UNION ALL " +
+ "SELECT STREAM * FROM T2 WHERE a = 2"
+
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T1", t1)
+ val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "2,2,Hello",
+ "3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test union of a table and a datastream **/
+ @Test
+ def testUnionTableWithDataSet(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT STREAM c FROM T1 WHERE a = 3 " +
+ "UNION ALL " +
+ "SELECT STREAM c FROM T2 WHERE a = 2"
+
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T1", t1)
+ val t2 = StreamTestData.get3TupleDataStream(env)
+ tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("Hello", "Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
index 4760d37..3147a8e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
@@ -24,9 +24,7 @@ import org.apache.flink.api.table.{TableEnvironment, Row}
import org.apache.flink.api.table.expressions.Literal
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.collection.JavaConversions._
import org.junit.Test
import org.junit.Assert._
import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
index 75a9c97..d606a80 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
@@ -29,7 +29,7 @@ import org.junit.Assert._
import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase
import org.apache.flink.api.scala.table.streaming.test.utils.StreamTestData
-class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
+class SelectITCase extends StreamingMultipleProgramsTestBase {
@Test
def testSimpleSelectAll(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala
index 76d26ab..e7dc518 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala
@@ -18,19 +18,27 @@
package org.apache.flink.api.scala.table.streaming.test.utils
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
+import java.util.Collections
+
import org.apache.flink.api.table.Row
+import org.junit.Assert._
import scala.collection.mutable
-import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import scala.collection.JavaConverters._
object StreamITCase {
var testResults = mutable.MutableList.empty[String]
+ def clear = {
+ StreamITCase.testResults.clear()
+ }
+
+ def compareWithList(expected: java.util.List[String]): Unit = {
+ Collections.sort(expected)
+ assertEquals(expected.asScala, StreamITCase.testResults.sorted)
+ }
+
final class StringSink extends RichSinkFunction[Row]() {
def invoke(value: Row) {
testResults.synchronized {
http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala
index 04d2c2e..3ab5f95 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala
@@ -19,13 +19,9 @@
package org.apache.flink.api.scala.table.streaming.test.utils
import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
import scala.collection.mutable
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
object StreamTestData {
[2/2] flink git commit: [FLINK-3794] Add checks for unsupported
operations in streaming table API
Posted by va...@apache.org.
[FLINK-3794] Add checks for unsupported operations in streaming table API
This closes #1921
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b06b01e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b06b01e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b06b01e
Branch: refs/heads/master
Commit: 4b06b01ea42561b46e6c38b70eb217abee0ebdf2
Parents: 51ff4a0
Author: vasia <va...@apache.org>
Authored: Thu Apr 21 12:17:32 2016 +0200
Committer: vasia <va...@apache.org>
Committed: Thu Apr 21 15:27:52 2016 +0200
----------------------------------------------------------------------
.../scala/table/StreamTableEnvironment.scala | 3 +-
.../org/apache/flink/api/table/table.scala | 74 +++++++++++++-------
.../streaming/test/UnsupportedOpsTest.scala | 62 ++++++++++++++++
3 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4b06b01e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
index 1bbfaaa..3beedcc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
@@ -90,7 +90,8 @@ class StreamTableEnvironment(
* [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
* Registered tables can be referenced in SQL queries.
*
- * The field names of the [[Table]] are automatically derived from the type of the [[DataStream]].
+ * The field names of the [[Table]] are automatically derived
+ * from the type of the [[DataStream]].
*
* @param name The name under which the [[DataStream]] is registered in the catalog.
* @param dataStream The [[DataStream]] to register.
http://git-wip-us.apache.org/repos/asf/flink/blob/4b06b01e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index f9536a1..6485139 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -102,8 +102,14 @@ class Table(
// apply aggregations
if (aggCalls.nonEmpty) {
- val emptyKey: GroupKey = relBuilder.groupKey()
- relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava)
+ // aggregation on stream table is not currently supported
+ tableEnv match {
+ case _: StreamTableEnvironment =>
+ throw new TableException("Aggregation on stream tables is currently not supported.")
+ case _ =>
+ val emptyKey: GroupKey = relBuilder.groupKey()
+ relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava)
+ }
}
// get selection expressions
@@ -262,11 +268,18 @@ class Table(
*/
def groupBy(fields: Expression*): GroupedTable = {
- relBuilder.push(relNode)
- val groupExpr = fields.map(_.toRexNode(relBuilder)).toIterable.asJava
- val groupKey = relBuilder.groupKey(groupExpr)
+ // group by on stream tables is currently not supported
+ tableEnv match {
+ case _: StreamTableEnvironment =>
+ throw new TableException("Group by on stream tables is currently not supported.")
+ case _ => {
+ relBuilder.push(relNode)
+ val groupExpr = fields.map(_.toRexNode(relBuilder)).toIterable.asJava
+ val groupKey = relBuilder.groupKey(groupExpr)
- new GroupedTable(relBuilder.build(), tableEnv, groupKey)
+ new GroupedTable(relBuilder.build(), tableEnv, groupKey)
+ }
+ }
}
/**
@@ -294,9 +307,15 @@ class Table(
* }}}
*/
def distinct(): Table = {
- relBuilder.push(relNode)
- relBuilder.distinct()
- new Table(relBuilder.build(), tableEnv)
+ // distinct on stream table is not currently supported
+ tableEnv match {
+ case _: StreamTableEnvironment =>
+ throw new TableException("Distinct on stream tables is currently not supported.")
+ case _ =>
+ relBuilder.push(relNode)
+ relBuilder.distinct()
+ new Table(relBuilder.build(), tableEnv)
+ }
}
/**
@@ -314,24 +333,31 @@ class Table(
*/
def join(right: Table): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new TableException("Only tables from the same TableEnvironment can be joined.")
- }
+ // join on stream tables is currently not supported
+ tableEnv match {
+ case _: StreamTableEnvironment =>
+ throw new TableException("Join on stream tables is currently not supported.")
+ case _ => {
+ // check that right table belongs to the same TableEnvironment
+ if (right.tableEnv != this.tableEnv) {
+ throw new TableException("Only tables from the same TableEnvironment can be joined.")
+ }
- // check that join inputs do not have overlapping field names
- val leftFields = relNode.getRowType.getFieldNames.asScala.toSet
- val rightFields = right.relNode.getRowType.getFieldNames.asScala.toSet
- if (leftFields.intersect(rightFields).nonEmpty) {
- throw new IllegalArgumentException("Overlapping fields names on join input.")
- }
+ // check that join inputs do not have overlapping field names
+ val leftFields = relNode.getRowType.getFieldNames.asScala.toSet
+ val rightFields = right.relNode.getRowType.getFieldNames.asScala.toSet
+ if (leftFields.intersect(rightFields).nonEmpty) {
+ throw new IllegalArgumentException("Overlapping fields names on join input.")
+ }
- relBuilder.push(relNode)
- relBuilder.push(right.relNode)
+ relBuilder.push(relNode)
+ relBuilder.push(right.relNode)
- relBuilder.join(JoinRelType.INNER, relBuilder.literal(true))
- val join = relBuilder.build()
- new Table(join, tableEnv)
+ relBuilder.join(JoinRelType.INNER, relBuilder.literal(true))
+ val join = relBuilder.build()
+ new Table(join, tableEnv)
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/4b06b01e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala
new file mode 100644
index 0000000..f7bd0ff
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.streaming.test
+
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.table.streaming.test.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.api.table.{TableException, TableEnvironment}
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Test
+
+import scala.collection.mutable
+
+class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
+
+ @Test(expected = classOf[TableException])
+ def testSelectWithAggregation(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGroupBy(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ .groupBy('_1)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testDistinct(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).distinct()
+ }
+
+ @Test(expected = classOf[TableException])
+ def testJoin(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ t1.join(t2)
+ }
+}