You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2016/04/21 15:34:51 UTC

[1/2] flink git commit: [FLINK-3727] Add support for embedded streaming SQL (projection, filter, union)

Repository: flink
Updated Branches:
  refs/heads/master 7478cba17 -> 4b06b01ea


[FLINK-3727] Add support for embedded streaming SQL (projection, filter, union)

- add methods to register DataStreams
- add sql translation method in StreamTableEnvironment
- add a custom rule to convert to streamable table
- add docs for streaming table and sql

This closes #1917


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51ff4a08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51ff4a08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51ff4a08

Branch: refs/heads/master
Commit: 51ff4a0867179d8b0e6c4dc07df46f871063b511
Parents: 7478cba
Author: vasia <va...@apache.org>
Authored: Fri Apr 15 13:35:24 2016 +0200
Committer: vasia <va...@apache.org>
Committed: Thu Apr 21 14:42:58 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/libs/table.md                   |  41 ++++-
 .../api/java/table/StreamTableEnvironment.scala |  48 ++++-
 .../scala/table/StreamTableEnvironment.scala    |  45 ++++-
 .../api/table/StreamTableEnvironment.scala      |  44 ++++-
 .../flink/api/table/TableEnvironment.scala      |  30 +++-
 .../rules/EnumerableToLogicalTableScan.scala    |   1 -
 .../api/table/plan/rules/FlinkRuleSets.scala    |  58 ++++---
 .../plan/rules/LogicalScanToStreamable.scala    |  56 ++++++
 .../api/table/plan/schema/DataStreamTable.scala |   1 -
 .../table/plan/schema/TransStreamTable.scala    |  78 +++++++++
 .../flink/api/java/sql/test/BatchSQLITCase.java | 121 +++++++++++++
 .../api/java/sql/test/StreamingSQLITCase.java   | 121 +++++++++++++
 .../java/table/test/utils/StreamTestData.java   |  64 +++++++
 .../sql/streaming/test/StreamSQLITCase.scala    | 173 +++++++++++++++++++
 .../table/streaming/test/FilterITCase.scala     |   2 -
 .../table/streaming/test/SelectITCase.scala     |   2 +-
 .../streaming/test/utils/StreamITCase.scala     |  18 +-
 .../streaming/test/utils/StreamTestData.scala   |   4 -
 18 files changed, 846 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/docs/apis/batch/libs/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/table.md b/docs/apis/batch/libs/table.md
index 5b1280c..7b41cd3 100644
--- a/docs/apis/batch/libs/table.md
+++ b/docs/apis/batch/libs/table.md
@@ -59,16 +59,18 @@ Note that the Table API is currently not part of the binary distribution. See li
 
 Table API
 ----------
-The Table API provides methods to apply relational operations on DataSets, both in Scala and Java.
+The Table API provides methods to apply relational operations on DataSets and Datastreams both in Scala and Java.
 
-The central concept of the Table API is a `Table` which represents a table with relational schema (or relation). Tables can be created from a `DataSet`, converted into a `DataSet`, or registered in a table catalog using a `TableEnvironment`. A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine Tables of different TableEnvironments. 
+The central concept of the Table API is a `Table` which represents a table with relational schema (or relation). Tables can be created from a `DataSet` or `DataStream`, converted into a `DataSet` or `DataStream`, or registered in a table catalog using a `TableEnvironment`. A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine Tables of different TableEnvironments. 
 
-The following sections show by example how to use the Table API embedded in  the Scala and Java DataSet APIs.
+*Note that the only operations currently supported on streaming Tables are selection, filtering, and union.*
+
+The following sections show by example how to use the Table API embedded in the Scala and Java DataSet APIs.
 
 ### Scala Table API
 
 The Table API is enabled by importing `org.apache.flink.api.scala.table._`. This enables
-implicit conversions to convert a DataSet to a Table. The following example shows:
+implicit conversions to convert a `DataSet` or `DataStream` to a Table. The following example shows:
 
 - how a `DataSet` is converted to a `Table`,
 - how relational queries are specified, and 
@@ -112,6 +114,23 @@ val joined = input1.join(input2)
 
 Notice, how the field names of a Table can be changed with `as()` or specified with `toTable()` when converting a DataSet to a Table. In addition, the example shows how to use Strings to specify relational expressions.
 
+Creating a `Table` from a `DataStream` works in a similar way.
+The following example shows how to convert a `DataStream` to a `Table` and filter it with the Table API.
+
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val inputStream = env.addSource(...)
+val result = inputStream
+                .toTable(tEnv, 'a, 'b, 'c)
+                .filter('a === 3)
+val resultStream = result.toDataStream[Row]
+{% endhighlight %}
+
 Please refer to the Scaladoc (and Javadoc) for a full list of supported operations and a description of the expression syntax.
 
 {% top %}
@@ -419,8 +438,8 @@ Only the types `LONG` and `STRING` can be casted to `DATE` and vice versa. A `LO
 SQL
 ----
 The Table API also supports embedded SQL queries.
-In order to use a `Table` or `DataSet` in a SQL query, it has to be registered in the `TableEnvironment`, using a unique name.
-A registered `Table` can be retrieved back from the `TableEnvironment` using the `scan()` method:
+In order to use a `Table`, `DataSet`, or `DataStream` in a SQL query, it has to be registered in the `TableEnvironment`, using a unique name.
+A registered Dataset `Table` can be retrieved back from the `TableEnvironment` using the `scan()` method and a registered DataStream `Table` can be retrieved using the `ingest()` method.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -452,9 +471,11 @@ val t = tableEnv.scan("MyTable")
 </div>
 </div>
 
-*Note: Table names are not allowed to follow the `^_DataSetTable_[0-9]+` pattern, as this is reserved for internal use only.*
+A DataStream `Table` can be registered in the `StreamTableEnvironment` using the correponding `registerDataStream` method.
+
+*Note: DataSet Table names are not allowed to follow the `^_DataSetTable_[0-9]+` pattern and DataStream Tables are not allowed to follow the `^_DataStreamTable_[0-9]+` pattern, as these are reserved for internal use only.*
 
-When registering a `DataSet`, one can also specify the field names of the table:
+When registering a `DataSet` or `DataStream`, one can also specify the field names of the table:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -492,7 +513,7 @@ tableEnv.registerTable("Orders", t)
 </div>
 </div>
 
-Registered tables can be used in SQL queries. A SQL query is defined using the `sql()` method of the `TableEnvironment`. It returns a new `Table` which can be converted back to a `DataSet` or used in subsequent Table API queries.
+Registered tables can be used in SQL queries. A SQL query is defined using the `sql()` method of the `TableEnvironment`. It returns a new `Table` which can be converted back to a `DataSet`, or `DataStream`, or used in subsequent Table API queries.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -524,6 +545,8 @@ val result = tableEnv.sql("SELECT SUM(amount) FROM Orders WHERE product = 10")
 </div>
 </div>
 
+SQL queries can be executed on DataStream Tables by adding the `STREAM` SQL keyword before the table name. Please refer to the [Apache Calcite SQL Streaming documentation](https://calcite.apache.org/docs/stream.html) for more information on the Streaming SQL syntax.
+
 {% top %}
 
 Runtime Configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
index 980e45b..2faf13d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
@@ -58,7 +58,7 @@ class StreamTableEnvironment(
   def fromDataStream[T](dataStream: DataStream[T]): Table = {
 
     val name = createUniqueTableName()
-    registerDataStreamInternal(name, dataStream)
+    registerDataStreamInternal(name, dataStream, false)
     ingest(name)
   }
 
@@ -83,11 +83,55 @@ class StreamTableEnvironment(
       .toArray
 
     val name = createUniqueTableName()
-    registerDataStreamInternal(name, dataStream, exprs)
+    registerDataStreamInternal(name, dataStream, exprs, false)
     ingest(name)
   }
 
   /**
+    * Registers the given [[DataStream]] as table in the
+    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * The field names of the [[Table]] are automatically derived
+    * from the type of the [[DataStream]].
+    *
+    * @param name The name under which the [[DataStream]] is registered in the catalog.
+    * @param dataStream The [[DataStream]] to register.
+    * @tparam T The type of the [[DataStream]] to register.
+    */
+  def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
+
+    checkValidTableName(name)
+    registerDataStreamInternal(name, dataStream, true)
+  }
+
+  /**
+    * Registers the given [[DataStream]] as table with specified field names in the
+    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * Example:
+    *
+    * {{{
+    *   DataStream<Tuple2<String, Long>> set = ...
+    *   tableEnv.registerDataStream("myTable", set, "a, b")
+    * }}}
+    *
+    * @param name The name under which the [[DataStream]] is registered in the catalog.
+    * @param dataStream The [[DataStream]] to register.
+    * @param fields The field names of the registered table.
+    * @tparam T The type of the [[DataStream]] to register.
+    */
+  def registerDataStream[T](name: String, dataStream: DataStream[T], fields: String): Unit = {
+    val exprs = ExpressionParser
+      .parseExpressionList(fields)
+      .toArray
+
+    checkValidTableName(name)
+    registerDataStreamInternal(name, dataStream, exprs, true)
+  }
+
+  /**
     * Converts the given [[Table]] into a [[DataStream]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
index 48de953..1bbfaaa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
@@ -19,7 +19,6 @@ package org.apache.flink.api.scala.table
 
 import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.typeutils.RowTypeInfo
 import org.apache.flink.api.table.{Row, TableConfig, Table}
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
@@ -60,7 +59,7 @@ class StreamTableEnvironment(
   def fromDataStream[T](dataStream: DataStream[T]): Table = {
 
     val name = createUniqueTableName()
-    registerDataStreamInternal(name, dataStream.javaStream)
+    registerDataStreamInternal(name, dataStream.javaStream, false)
     ingest(name)
   }
 
@@ -82,11 +81,51 @@ class StreamTableEnvironment(
   def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = {
 
     val name = createUniqueTableName()
-    registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
+    registerDataStreamInternal(name, dataStream.javaStream, fields.toArray, false)
     ingest(name)
   }
 
   /**
+    * Registers the given [[DataStream]] as table in the
+    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * The field names of the [[Table]] are automatically derived from the type of the [[DataStream]].
+    *
+    * @param name The name under which the [[DataStream]] is registered in the catalog.
+    * @param dataStream The [[DataStream]] to register.
+    * @tparam T The type of the [[DataStream]] to register.
+    */
+  def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
+
+    checkValidTableName(name)
+    registerDataStreamInternal(name, dataStream.javaStream, true)
+  }
+
+  /**
+    * Registers the given [[DataStream]] as table with specified field names in the
+    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * Example:
+    *
+    * {{{
+    *   val set: DataStream[(String, Long)] = ...
+    *   tableEnv.registerDataStream("myTable", set, 'a, 'b)
+    * }}}
+    *
+    * @param name The name under which the [[DataStream]] is registered in the catalog.
+    * @param dataStream The [[DataStream]] to register.
+    * @param fields The field names of the registered table.
+    * @tparam T The type of the [[DataStream]] to register.
+    */
+  def registerDataStream[T](name: String, dataStream: DataStream[T], fields: Expression*): Unit = {
+
+    checkValidTableName(name)
+    registerDataStreamInternal(name, dataStream.javaStream, fields.toArray, true)
+  }
+
+  /**
     * Converts the given [[Table]] into a [[DataStream]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index 8724b5a..48f33f8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.PlanGenException
 import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamRel, DataStreamConvention}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.plan.schema.DataStreamTable
+import org.apache.flink.api.table.plan.schema.{TransStreamTable, DataStreamTable}
 import org.apache.flink.streaming.api.datastream.DataStream
 
 /**
@@ -103,7 +103,15 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
     */
   override def sql(query: String): Table = {
 
-    ???
+    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner)
+    // parse the sql query
+    val parsed = planner.parse(query)
+    // validate the sql query
+    val validated = planner.validate(parsed)
+    // transform to a relational tree
+    val relational = planner.rel(validated)
+
+    new Table(relational.rel, this)
   }
 
   /**
@@ -112,11 +120,14 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
     *
     * @param name The name under which the table is registered in the catalog.
     * @param dataStream The [[DataStream]] to register as table in the catalog.
+    * @param wrapper True if the registration has to wrap the datastreamTable
+    *                into a [[org.apache.calcite.schema.StreamableTable]]
     * @tparam T the type of the [[DataStream]].
     */
   protected def registerDataStreamInternal[T](
     name: String,
-    dataStream: DataStream[T]): Unit = {
+    dataStream: DataStream[T],
+    wrapper: Boolean): Unit = {
 
     val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType)
     val dataStreamTable = new DataStreamTable[T](
@@ -124,7 +135,16 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
       fieldIndexes,
       fieldNames
     )
-    registerTableInternal(name, dataStreamTable)
+    // when registering a DataStream, we need to wrap it into a StreamableTable
+    // so that the SQL validation phase won't fail
+    if (wrapper) {
+      registerTableInternal(name, dataStreamTable)
+      val t = ingest(name)
+      replaceRegisteredTable(name, new TransStreamTable(t.getRelNode, true))
+    }
+    else {
+      registerTableInternal(name, dataStreamTable)
+    }
   }
 
   /**
@@ -134,12 +154,15 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
     * @param name The name under which the table is registered in the catalog.
     * @param dataStream The [[DataStream]] to register as table in the catalog.
     * @param fields The field expressions to define the field names of the table.
+    * @param wrapper True if the registration has to wrap the datastreamTable
+    *                into a [[org.apache.calcite.schema.StreamableTable]]
     * @tparam T The type of the [[DataStream]].
     */
   protected def registerDataStreamInternal[T](
     name: String,
     dataStream: DataStream[T],
-    fields: Array[Expression]): Unit = {
+    fields: Array[Expression],
+    wrapper: Boolean): Unit = {
 
     val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields.toArray)
     val dataStreamTable = new DataStreamTable[T](
@@ -147,7 +170,16 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
       fieldIndexes.toArray,
       fieldNames.toArray
     )
-    registerTableInternal(name, dataStreamTable)
+    // when registering a DataStream, we need to wrap it into a StreamableTable
+    // so that the SQL validation phase won't fail
+    if (wrapper) {
+      registerTableInternal(name, dataStreamTable)
+      val t = ingest(name)
+      replaceRegisteredTable(name, new TransStreamTable(t.getRelNode, true))
+    }
+    else {
+      registerTableInternal(name, dataStreamTable)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index 77830ca..2c166b7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -37,7 +37,7 @@ import org.apache.flink.api.scala.table.{StreamTableEnvironment => ScalaStreamTa
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.table.expressions.{Naming, UnresolvedFieldReference, Expression}
 import org.apache.flink.api.table.plan.cost.DataSetCostFactory
-import org.apache.flink.api.table.plan.schema.TableTable
+import org.apache.flink.api.table.plan.schema.{TransStreamTable, TableTable}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv}
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
 
@@ -98,8 +98,32 @@ abstract class TableEnvironment(val config: TableConfig) {
 
     checkValidTableName(name)
 
-    val tableTable = new TableTable(table.getRelNode)
-    registerTableInternal(name, tableTable)
+    table.tableEnv match {
+      case e: BatchTableEnvironment =>
+        val tableTable = new TableTable(table.getRelNode)
+        registerTableInternal(name, tableTable)
+      case e: StreamTableEnvironment =>
+        val sTableTable = new TransStreamTable(table.getRelNode, true)
+        tables.add(name, sTableTable)
+    }
+
+  }
+
+  /**
+    * Replaces a registered Table with another Table under the same name.
+    * We use this method to replace a [[org.apache.flink.api.table.plan.schema.DataStreamTable]]
+    * with a [[org.apache.calcite.schema.TranslatableTable]].
+    *
+    * @param name
+    * @param table
+    */
+  protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = {
+
+    if (isRegistered(name)) {
+      tables.add(name, table)
+    } else {
+      throw new TableException(s"Table \'$name\' is not registered.")
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
index 02d2159..ee515c9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.table.plan.rules
 
-
 import org.apache.calcite.plan.RelOptRule.{any, operand}
 import org.apache.calcite.adapter.enumerable.EnumerableTableScan
 import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand}

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index e9bcaa2..705bd83 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -19,9 +19,12 @@
 package org.apache.flink.api.table.plan.rules
 
 import org.apache.calcite.rel.rules._
+import org.apache.calcite.rel.stream.StreamRules
 import org.apache.calcite.tools.{RuleSets, RuleSet}
 import org.apache.flink.api.table.plan.rules.dataSet._
 import org.apache.flink.api.table.plan.rules.datastream._
+import org.apache.flink.api.table.plan.rules.datastream.{DataStreamCalcRule, DataStreamScanRule, DataStreamUnionRule}
+import scala.collection.JavaConversions._
 
 object FlinkRuleSets {
 
@@ -105,36 +108,43 @@ object FlinkRuleSets {
   /**
   * RuleSet to optimize plans for batch / DataSet execution
   */
-  val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
+  val DATASTREAM_OPT_RULES: RuleSet = {
 
-    // calc rules
-    FilterToCalcRule.INSTANCE,
-    ProjectToCalcRule.INSTANCE,
-    FilterCalcMergeRule.INSTANCE,
-    ProjectCalcMergeRule.INSTANCE,
-    CalcMergeRule.INSTANCE,
+    val rules = List(
 
-    // prune empty results rules
-    PruneEmptyRules.FILTER_INSTANCE,
-    PruneEmptyRules.PROJECT_INSTANCE,
-    PruneEmptyRules.UNION_INSTANCE,
+      EnumerableToLogicalTableScan.INSTANCE,
+      LogicalScanToStreamable.INSTANCE,
 
-    // simplify expressions rules
-    ReduceExpressionsRule.CALC_INSTANCE,
+      // calc rules
+      FilterToCalcRule.INSTANCE,
+      ProjectToCalcRule.INSTANCE,
+      FilterCalcMergeRule.INSTANCE,
+      ProjectCalcMergeRule.INSTANCE,
+      CalcMergeRule.INSTANCE,
 
-    // push and merge projection rules
-    ProjectFilterTransposeRule.INSTANCE,
-    FilterProjectTransposeRule.INSTANCE,
-    ProjectRemoveRule.INSTANCE,
+      // prune empty results rules
+      PruneEmptyRules.FILTER_INSTANCE,
+      PruneEmptyRules.PROJECT_INSTANCE,
+      PruneEmptyRules.UNION_INSTANCE,
 
-    // merge and push unions rules
-    UnionEliminatorRule.INSTANCE,
+      // push and merge projection rules
+      ProjectFilterTransposeRule.INSTANCE,
+      FilterProjectTransposeRule.INSTANCE,
+      ProjectRemoveRule.INSTANCE,
+
+      // simplify expressions rules
+      ReduceExpressionsRule.CALC_INSTANCE,
+
+      // merge and push unions rules
+      UnionEliminatorRule.INSTANCE,
 
-    // translate to DataStream nodes
-    DataStreamCalcRule.INSTANCE,
-    DataStreamScanRule.INSTANCE,
-    DataStreamUnionRule.INSTANCE,
-    DataStreamValuesRule.INSTANCE
+      // translate to DataStream nodes
+      DataStreamCalcRule.INSTANCE,
+      DataStreamScanRule.INSTANCE,
+      DataStreamUnionRule.INSTANCE,
+      DataStreamValuesRule.INSTANCE
   )
 
+    RuleSets.ofList(rules ++ StreamRules.RULES.asList.take(7))
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/LogicalScanToStreamable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/LogicalScanToStreamable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/LogicalScanToStreamable.scala
new file mode 100644
index 0000000..3b389bc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/LogicalScanToStreamable.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand}
+import org.apache.calcite.prepare.RelOptTableImpl
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.schema.StreamableTable
+import org.apache.flink.api.table.plan.schema.TransStreamTable
+
+/**
+  * Custom rule that converts a LogicalScan into another LogicalScan
+  * whose internal Table is [[StreamableTable]] and [[org.apache.calcite.schema.TranslatableTable]].
+  */
+class LogicalScanToStreamable(
+    operand: RelOptRuleOperand,
+    description: String) extends RelOptRule(operand, description) {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val oldRel = call.rel(0).asInstanceOf[LogicalTableScan]
+    val table = oldRel.getTable
+    table.unwrap(classOf[StreamableTable]) match {
+      case s: StreamableTable =>
+        // already a StreamableTable => do nothing
+      case _ => // convert to a StreamableTable
+        val sTable = new TransStreamTable(oldRel, false)
+        val newRel = LogicalTableScan.create(oldRel.getCluster,
+          RelOptTableImpl.create(table.getRelOptSchema, table.getRowType, sTable))
+        call.transformTo(newRel)
+    }
+  }
+}
+
+object LogicalScanToStreamable {
+  val INSTANCE = new LogicalScanToStreamable(
+    operand(classOf[LogicalTableScan], any),
+    "LogicalScanToStreamable")
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
index 1523f93..ffc2692 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
@@ -33,5 +33,4 @@ class DataStreamTable[T](
       .foreach( f => builder.add(f._1, f._2).nullable(true) )
     builder.build
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
new file mode 100644
index 0000000..bc27659
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Schema.TableType
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.{StreamableTable, Table, TranslatableTable}
+
+/**
+  * A [[org.apache.calcite.schema.Table]] implementation for registering
+  * Streaming Table API Tables in the Calcite schema to be used by Flink SQL.
+  * It implements [[TranslatableTable]] so that its logical scan
+  * can be converted to a relational expression and [[StreamableTable]]
+  * so that it can be used in Streaming SQL queries.
+  *
+  * Except for registering Streaming Tables, this implementation is also used
+  * in [[org.apache.flink.api.table.plan.rules.LogicalScanToStreamable]]
+  * rule to convert a logical scan of a non-Streamable Table into
+  * a logical scan of a Streamable table, i.e. of this class.
+  *
+  * @see [[DataStreamTable]]
+  */
+class TransStreamTable(relNode: RelNode, wrapper: Boolean)
+  extends AbstractTable
+  with TranslatableTable
+  with StreamableTable {
+
+  override def getJdbcTableType: TableType = ???
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType
+
+  override def stream(): Table = {
+    if (wrapper) {
+      // we need to return a wrapper non-streamable table,
+      // otherwise Calcite's rule-matching produces an infinite loop
+      new StreamTable(relNode)
+    }
+    else {
+      this
+    }
+  }
+
+  override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode =
+    relNode
+
+  /**
+    * Wraps a [[TransStreamTable]]'s relNode
+    * to implement its stream() method.
+    */
+  class StreamTable(relNode: RelNode) extends AbstractTable {
+
+    override def getJdbcTableType: TableType = ???
+
+    override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+      relNode.getRowType
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java
new file mode 100644
index 0000000..31a5d53
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sql.test;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class BatchSQLITCase extends TableProgramsTestBase {
+
+	public BatchSQLITCase(TestExecutionMode mode, TableConfigMode configMode) {
+		super(mode, configMode);
+	}
+
+	@Test
+	public void testSelectFromTable() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds, "a,b,c");
+		tableEnv.registerTable("T", in);
+
+		String sqlQuery = "SELECT a, c FROM T";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,Hi\n" + "2,Hello\n" + "3,Hello world\n" +
+			"4,Hello world, how are you?\n" + "5,I am fine.\n" + "6,Luke Skywalker\n" +
+			"7,Comment#1\n" + "8,Comment#2\n" + "9,Comment#3\n" + "10,Comment#4\n" +
+			"11,Comment#5\n" + "12,Comment#6\n" + "13,Comment#7\n" +
+			"14,Comment#8\n" + "15,Comment#9\n" + "16,Comment#10\n" +
+			"17,Comment#11\n" + "18,Comment#12\n" + "19,Comment#13\n" +
+			"20,Comment#14\n" + "21,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testFilterFromDataSet() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet("DataSetTable", ds, "x, y, z");
+
+		String sqlQuery = "SELECT x FROM DataSetTable WHERE z LIKE '%Hello%'";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "2\n" + "3\n" + "4";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAggregation() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet("AggTable", ds, "x, y, z");
+
+		String sqlQuery = "SELECT sum(x), min(x), max(x), count(y), avg(x) FROM AggTable";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "231,1,21,21,11";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testJoin() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		tableEnv.registerDataSet("t1", ds1, "a, b, c");
+		tableEnv.registerDataSet("t2",ds2, "d, e, f, g, h");
+
+		String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
+		compareResultAsText(results, expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java
new file mode 100644
index 0000000..8e420f2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.sql.test;
+
+import org.apache.flink.api.java.table.StreamTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.api.java.table.test.utils.StreamTestData;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamingSQLITCase extends StreamingMultipleProgramsTestBase {
+
+	@Test
+	public void testSelect() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		DataStream<Tuple3<Integer, Long, String>> ds = StreamTestData.getSmall3TupleDataSet(env);
+		Table in = tableEnv.fromDataStream(ds, "a,b,c");
+		tableEnv.registerTable("MyTable", in);
+
+		String sqlQuery = "SELECT STREAM * FROM MyTable";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink());
+		env.execute();
+
+		List<String> expected = new ArrayList();
+		expected.add("1,1,Hi");
+		expected.add("2,2,Hello");
+		expected.add("3,2,Hello world");
+
+		StreamITCase.compareWithList(expected);
+	}
+
+	@Test
+	public void testFilter() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env);
+		tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
+
+		String sqlQuery = "SELECT STREAM a, b, e FROM MyTable WHERE c < 4";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink());
+		env.execute();
+
+		List<String> expected = new ArrayList();
+		expected.add("1,1,1");
+		expected.add("2,2,2");
+		expected.add("2,3,1");
+		expected.add("3,4,2");
+
+		StreamITCase.compareWithList(expected);
+	}
+
+	@Test
+	public void testUnion() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		DataStream<Tuple3<Integer, Long, String>> ds1 = StreamTestData.getSmall3TupleDataSet(env);
+		Table t1 = tableEnv.fromDataStream(ds1, "a,b,c");
+		tableEnv.registerTable("T1", t1);
+
+		DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds2 = StreamTestData.get5TupleDataStream(env);
+		tableEnv.registerDataStream("T2", ds2, "a, b, d, c, e");
+
+		String sqlQuery = "SELECT STREAM * FROM T1 " +
+							"UNION ALL " +
+							"(SELECT STREAM a, b, c FROM T2 WHERE a	< 3)";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink());
+		env.execute();
+
+		List<String> expected = new ArrayList();
+		expected.add("1,1,Hi");
+		expected.add("2,2,Hello");
+		expected.add("3,2,Hello world");
+		expected.add("1,1,Hallo");
+		expected.add("2,2,Hallo Welt");
+		expected.add("2,3,Hallo Welt wie");
+
+		StreamITCase.compareWithList(expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/utils/StreamTestData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/utils/StreamTestData.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/utils/StreamTestData.java
new file mode 100644
index 0000000..dc3a8dc
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/utils/StreamTestData.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test.utils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class StreamTestData {
+
+	public static DataStream<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(StreamExecutionEnvironment env) {
+
+		List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
+		data.add(new Tuple3<>(1, 1L, "Hi"));
+		data.add(new Tuple3<>(2, 2L, "Hello"));
+		data.add(new Tuple3<>(3, 2L, "Hello world"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataStream<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataStream(StreamExecutionEnvironment env) {
+
+		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
+		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
+		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
+		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
+		data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
+		data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
+		data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
+		data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
+		data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
+		data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
+		data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
+		data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
+		data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
+		data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
+		data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
+		data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
+		return env.fromCollection(data);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/streaming/test/StreamSQLITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/streaming/test/StreamSQLITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/streaming/test/StreamSQLITCase.scala
new file mode 100644
index 0000000..573a628
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/streaming/test/StreamSQLITCase.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.streaming.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.table.streaming.test.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class StreamSQLITCase extends StreamingMultipleProgramsTestBase {
+
+  /** test selection **/
+  @Test
+  def testSelectExpressionFromTable(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT STREAM a * 2, b - 1 FROM MyTable"
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("2,0", "4,1", "6,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test filtering with registered table **/
+  @Test
+  def testSimpleFilter(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT STREAM * FROM MyTable WHERE a = 3"
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test filtering with registered datastream **/
+  @Test
+  def testDatastreamFilter(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT STREAM * FROM MyTable WHERE _1 = 3"
+
+    val t = StreamTestData.getSmall3TupleDataStream(env)
+    tEnv.registerDataStream("MyTable", t)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test union with registered tables **/
+  @Test
+  def testUnion(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT STREAM * FROM T1 " +
+      "UNION ALL " +
+      "SELECT STREAM * FROM T2"
+
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T1", t1)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,1,Hi", "1,1,Hi",
+      "2,2,Hello", "2,2,Hello",
+      "3,2,Hello world", "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test union with filter **/
+  @Test
+  def testUnionWithFilter(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT STREAM * FROM T1 WHERE a = 3 " +
+      "UNION ALL " +
+      "SELECT STREAM * FROM T2 WHERE a = 2"
+
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T1", t1)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "2,2,Hello",
+      "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test union of a table and a datastream **/
+  @Test
+  def testUnionTableWithDataSet(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT STREAM c FROM T1 WHERE a = 3 " +
+      "UNION ALL " +
+      "SELECT STREAM c FROM T2 WHERE a = 2"
+
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T1", t1)
+    val t2 = StreamTestData.get3TupleDataStream(env)
+    tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("Hello", "Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
index 4760d37..3147a8e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
@@ -24,9 +24,7 @@ import org.apache.flink.api.table.{TableEnvironment, Row}
 import org.apache.flink.api.table.expressions.Literal
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.collection.JavaConversions._
 import org.junit.Test
 import org.junit.Assert._
 import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
index 75a9c97..d606a80 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
@@ -29,7 +29,7 @@ import org.junit.Assert._
 import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase
 import org.apache.flink.api.scala.table.streaming.test.utils.StreamTestData
 
-class StreamSelectITCase extends StreamingMultipleProgramsTestBase {
+class SelectITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
   def testSimpleSelectAll(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala
index 76d26ab..e7dc518 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala
@@ -18,19 +18,27 @@
 
 package org.apache.flink.api.scala.table.streaming.test.utils
 
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
+import java.util.Collections
+
 import org.apache.flink.api.table.Row
+import org.junit.Assert._
 import scala.collection.mutable
-import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import scala.collection.JavaConverters._
 
 object StreamITCase {
 
   var testResults = mutable.MutableList.empty[String]
 
+  def clear = {
+    StreamITCase.testResults.clear()
+  }
+
+  def compareWithList(expected: java.util.List[String]): Unit = {
+    Collections.sort(expected)
+    assertEquals(expected.asScala, StreamITCase.testResults.sorted)
+  }
+
   final class StringSink extends RichSinkFunction[Row]() {
     def invoke(value: Row) {
       testResults.synchronized {

http://git-wip-us.apache.org/repos/asf/flink/blob/51ff4a08/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala
index 04d2c2e..3ab5f95 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala
@@ -19,13 +19,9 @@
 package org.apache.flink.api.scala.table.streaming.test.utils
 
 import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
 import scala.collection.mutable
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 
 object StreamTestData {
 


[2/2] flink git commit: [FLINK-3794] Add checks for unsupported operations in streaming table API

Posted by va...@apache.org.
[FLINK-3794] Add checks for unsupported operations in streaming table API

This closes #1921


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b06b01e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b06b01e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b06b01e

Branch: refs/heads/master
Commit: 4b06b01ea42561b46e6c38b70eb217abee0ebdf2
Parents: 51ff4a0
Author: vasia <va...@apache.org>
Authored: Thu Apr 21 12:17:32 2016 +0200
Committer: vasia <va...@apache.org>
Committed: Thu Apr 21 15:27:52 2016 +0200

----------------------------------------------------------------------
 .../scala/table/StreamTableEnvironment.scala    |  3 +-
 .../org/apache/flink/api/table/table.scala      | 74 +++++++++++++-------
 .../streaming/test/UnsupportedOpsTest.scala     | 62 ++++++++++++++++
 3 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b06b01e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
index 1bbfaaa..3beedcc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
@@ -90,7 +90,8 @@ class StreamTableEnvironment(
     * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
     * Registered tables can be referenced in SQL queries.
     *
-    * The field names of the [[Table]] are automatically derived from the type of the [[DataStream]].
+    * The field names of the [[Table]] are automatically derived
+    * from the type of the [[DataStream]].
     *
     * @param name The name under which the [[DataStream]] is registered in the catalog.
     * @param dataStream The [[DataStream]] to register.

http://git-wip-us.apache.org/repos/asf/flink/blob/4b06b01e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index f9536a1..6485139 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -102,8 +102,14 @@ class Table(
 
     // apply aggregations
     if (aggCalls.nonEmpty) {
-      val emptyKey: GroupKey = relBuilder.groupKey()
-      relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava)
+      // aggregation on stream table is not currently supported
+      tableEnv match {
+        case _: StreamTableEnvironment =>
+          throw new TableException("Aggregation on stream tables is currently not supported.")
+        case _ =>
+          val emptyKey: GroupKey = relBuilder.groupKey()
+          relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava)
+      }
     }
 
     // get selection expressions
@@ -262,11 +268,18 @@ class Table(
     */
   def groupBy(fields: Expression*): GroupedTable = {
 
-    relBuilder.push(relNode)
-    val groupExpr = fields.map(_.toRexNode(relBuilder)).toIterable.asJava
-    val groupKey = relBuilder.groupKey(groupExpr)
+    // group by on stream tables is currently not supported
+    tableEnv match {
+      case _: StreamTableEnvironment =>
+        throw new TableException("Group by on stream tables is currently not supported.")
+      case _ => {
+        relBuilder.push(relNode)
+        val groupExpr = fields.map(_.toRexNode(relBuilder)).toIterable.asJava
+        val groupKey = relBuilder.groupKey(groupExpr)
 
-    new GroupedTable(relBuilder.build(), tableEnv, groupKey)
+        new GroupedTable(relBuilder.build(), tableEnv, groupKey)
+      }
+    }
   }
 
   /**
@@ -294,9 +307,15 @@ class Table(
     * }}}
     */
   def distinct(): Table = {
-    relBuilder.push(relNode)
-    relBuilder.distinct()
-    new Table(relBuilder.build(), tableEnv)
+    // distinct on stream table is not currently supported
+    tableEnv match {
+      case _: StreamTableEnvironment =>
+        throw new TableException("Distinct on stream tables is currently not supported.")
+      case _ =>
+        relBuilder.push(relNode)
+        relBuilder.distinct()
+        new Table(relBuilder.build(), tableEnv)
+    }
   }
 
   /**
@@ -314,24 +333,31 @@ class Table(
     */
   def join(right: Table): Table = {
 
-    // check that right table belongs to the same TableEnvironment
-    if (right.tableEnv != this.tableEnv) {
-      throw new TableException("Only tables from the same TableEnvironment can be joined.")
-    }
+    // join on stream tables is currently not supported
+    tableEnv match {
+      case _: StreamTableEnvironment =>
+        throw new TableException("Join on stream tables is currently not supported.")
+      case _ => {
+        // check that right table belongs to the same TableEnvironment
+        if (right.tableEnv != this.tableEnv) {
+          throw new TableException("Only tables from the same TableEnvironment can be joined.")
+        }
 
-    // check that join inputs do not have overlapping field names
-    val leftFields = relNode.getRowType.getFieldNames.asScala.toSet
-    val rightFields = right.relNode.getRowType.getFieldNames.asScala.toSet
-    if (leftFields.intersect(rightFields).nonEmpty) {
-      throw new IllegalArgumentException("Overlapping fields names on join input.")
-    }
+        // check that join inputs do not have overlapping field names
+        val leftFields = relNode.getRowType.getFieldNames.asScala.toSet
+        val rightFields = right.relNode.getRowType.getFieldNames.asScala.toSet
+        if (leftFields.intersect(rightFields).nonEmpty) {
+          throw new IllegalArgumentException("Overlapping fields names on join input.")
+        }
 
-    relBuilder.push(relNode)
-    relBuilder.push(right.relNode)
+        relBuilder.push(relNode)
+        relBuilder.push(right.relNode)
 
-    relBuilder.join(JoinRelType.INNER, relBuilder.literal(true))
-    val join = relBuilder.build()
-    new Table(join, tableEnv)
+        relBuilder.join(JoinRelType.INNER, relBuilder.literal(true))
+        val join = relBuilder.build()
+        new Table(join, tableEnv)
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/4b06b01e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala
new file mode 100644
index 0000000..f7bd0ff
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.streaming.test
+
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.table.streaming.test.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.api.table.{TableException, TableEnvironment}
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Test
+
+import scala.collection.mutable
+
+class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[TableException])
+  def testSelectWithAggregation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGroupBy(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+      .groupBy('_1)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testDistinct(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).distinct()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.join(t2)
+  }
+}