You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/18 19:22:56 UTC

[1/4] flink git commit: [FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END).

Repository: flink
Updated Branches:
  refs/heads/master c995ebd29 -> 9fc42df68


[FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END).

This closes #3930.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fc42df6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fc42df6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fc42df6

Branch: refs/heads/master
Commit: 9fc42df68d746c633b0d3c8995e0031064bfd362
Parents: 3a65e5a
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed May 17 16:26:27 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu May 18 21:22:12 2017 +0200

----------------------------------------------------------------------
 .../common/WindowStartEndPropertiesRule.scala   | 39 ++++++++++++++------
 .../scala/stream/sql/WindowAggregateTest.scala  | 27 ++++++++++++++
 2 files changed, 54 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9fc42df6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
index 7577deb..14e9b21 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -36,13 +36,21 @@ class WindowStartEndPropertiesRule
   override def matches(call: RelOptRuleCall): Boolean = {
     val project = call.rel(0).asInstanceOf[LogicalProject]
     // project includes at least on group auxiliary function
-    project.getProjects.exists {
-      case c: RexCall => c.getOperator.isGroupAuxiliary
-      case _ => false
+
+    def hasGroupAuxiliaries(node: RexNode): Boolean = {
+      node match {
+        case c: RexCall if c.getOperator.isGroupAuxiliary => true
+        case c: RexCall =>
+          c.operands.exists(hasGroupAuxiliaries)
+        case _ => false
+      }
     }
+
+    project.getProjects.exists(hasGroupAuxiliaries)
   }
 
   override def onMatch(call: RelOptRuleCall): Unit = {
+
     val project = call.rel(0).asInstanceOf[LogicalProject]
     val innerProject = call.rel(1).asInstanceOf[LogicalProject]
     val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
@@ -62,20 +70,27 @@ class WindowStartEndPropertiesRule
     transformed.project(
       innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end")))
 
-    // replace window auxiliary function by access to window properties
-    transformed.project(
-      project.getProjects.map{ x =>
-        if (WindowStartEndPropertiesRule.isWindowStart(x)) {
+    def replaceGroupAuxiliaries(node: RexNode): RexNode = {
+      node match {
+        case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
           // replace expression by access to window start
-          rexBuilder.makeCast(x.getType, transformed.field("w$start"), false)
-        } else if (WindowStartEndPropertiesRule.isWindowEnd(x)) {
+          rexBuilder.makeCast(c.getType, transformed.field("w$start"), false)
+        case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
           // replace expression by access to window end
-          rexBuilder.makeCast(x.getType, transformed.field("w$end"), false)
-        } else {
+          rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
+        case c: RexCall =>
+          // replace expressions in children
+          val newOps = c.getOperands.map(replaceGroupAuxiliaries)
+          c.clone(c.getType, newOps)
+        case x =>
           // preserve expression
           x
-        }
       }
+    }
+
+    // replace window auxiliary function by access to window properties
+    transformed.project(
+      project.getProjects.map(replaceGroupAuxiliaries)
     )
     val res = transformed.build()
     call.transformTo(res)

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc42df6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 2022db8..f95d0ab 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -150,6 +150,33 @@ class WindowAggregateTest extends TableTestBase {
     streamUtil.verifySql(sql, expected)
   }
 
+  @Test
+  def testExpressionOnWindowAuxFunction() = {
+    val sql =
+      "SELECT " +
+        "  COUNT(*), " +
+        "  TUMBLE_END(rowtime, INTERVAL '15' MINUTE) + INTERVAL '1' MINUTE " +
+        "FROM MyTable " +
+        "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "rowtime")
+          ),
+          term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+          term("select", "COUNT(*) AS EXPR$0", "start('w$) AS w$start", "end('w$) AS w$end")
+        ),
+        term("select", "EXPR$0", "DATETIME_PLUS(w$end, 60000) AS $f1")
+      )
+
+    streamUtil.verifySql(sql, expected)
+  }
+
   @Test(expected = classOf[TableException])
   def testTumbleWindowNoOffset(): Unit = {
     val sqlQuery =


[2/4] flink git commit: [FLINK-6585] [table] Fix execution of Table examples in IDE.

Posted by fh...@apache.org.
[FLINK-6585] [table] Fix execution of Table examples in IDE.

This closes #3905.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5310ed0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5310ed0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5310ed0

Branch: refs/heads/master
Commit: d5310ed02b30ef16f06be6efc52af5e183df26a7
Parents: 41aa98a
Author: twalthr <tw...@apache.org>
Authored: Mon May 15 13:29:50 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu May 18 21:22:12 2017 +0200

----------------------------------------------------------------------
 flink-examples/flink-examples-table/pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d5310ed0/flink-examples/flink-examples-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/pom.xml b/flink-examples/flink-examples-table/pom.xml
index e59d8c6..8574650 100644
--- a/flink-examples/flink-examples-table/pom.xml
+++ b/flink-examples/flink-examples-table/pom.xml
@@ -39,14 +39,14 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
-			<scope>provided</scope>
+			<scope>compile</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
-			<scope>provided</scope>
+			<scope>compile</scope>
 		</dependency>
 	</dependencies>
 


[3/4] flink git commit: [FLINK-6543] [table] Deprecate toDataStream and add toAppendStream.

Posted by fh...@apache.org.
[FLINK-6543] [table] Deprecate toDataStream and add toAppendStream.

This closes #3929.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a65e5ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a65e5ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a65e5ac

Branch: refs/heads/master
Commit: 3a65e5acbcc29636b0ce1631815861089fc21dca
Parents: d5310ed
Author: twalthr <tw...@apache.org>
Authored: Wed May 17 11:31:33 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu May 18 21:22:12 2017 +0200

----------------------------------------------------------------------
 .../table/examples/scala/StreamSQLExample.scala |   2 +-
 .../examples/scala/StreamTableExample.scala     |   2 +-
 .../flink/table/api/java/package-info.java      |   5 +-
 .../table/api/java/StreamTableEnvironment.scala | 121 ++++++++++++++++++-
 .../api/scala/StreamTableEnvironment.scala      |  55 ++++++++-
 .../table/api/scala/TableConversions.scala      |  96 +++++++++++++--
 .../apache/flink/table/api/scala/package.scala  |   2 +-
 .../table/api/java/stream/sql/SqlITCase.java    |   8 +-
 .../api/scala/stream/TableSourceITCase.scala    |   4 +-
 .../api/scala/stream/sql/OverWindowITCase.scala |  32 ++---
 .../table/api/scala/stream/sql/SqlITCase.scala  |  20 +--
 .../api/scala/stream/table/CalcITCase.scala     |  24 ++--
 .../table/GroupWindowAggregationsITCase.scala   |  10 +-
 .../scala/stream/table/OverWindowITCase.scala   |  10 +-
 .../api/scala/stream/table/UnionITCase.scala    |   8 +-
 .../datastream/DataStreamAggregateITCase.scala  |  12 +-
 .../datastream/DataStreamCalcITCase.scala       |   4 +-
 .../DataStreamUserDefinedFunctionITCase.scala   |  12 +-
 .../datastream/TimeAttributesITCase.scala       |  12 +-
 19 files changed, 339 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
index 2cdd8b8..665913e 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
@@ -62,7 +62,7 @@ object StreamSQLExample {
       "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
         "SELECT * FROM OrderB WHERE amount < 2")
 
-    result.toDataStream[Order].print()
+    result.toAppendStream[Order].print()
 
     env.execute()
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
index 5c5012b..1c0ffea 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
@@ -55,7 +55,7 @@ object StreamTableExample {
     val result: DataStream[Order] = orderA.unionAll(orderB)
       .select('user, 'product, 'amount)
       .where('amount > 2)
-      .toDataStream[Order]
+      .toAppendStream[Order]
 
     result.print()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
index 2409872..3dbf50f 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
@@ -57,8 +57,9 @@
  * <p>
  * As seen above, a {@link org.apache.flink.table.api.Table} can be converted back to the
  * underlying API representation using
- * {@link org.apache.flink.table.api.java.BatchTableEnvironment#toDataSet(Table, java.lang.Class)}
- * or {@link org.apache.flink.table.api.java.StreamTableEnvironment#toDataStream(Table, java.lang.Class)}}.
+ * {@link org.apache.flink.table.api.java.BatchTableEnvironment#toDataSet(Table, java.lang.Class)},
+ * {@link org.apache.flink.table.api.java.StreamTableEnvironment#toAppendStream(Table, java.lang.Class)}}, or
+ * {@link org.apache.flink.table.api.java.StreamTableEnvironment#toRetractStream(Table, java.lang.Class)}}.
  */
 package org.apache.flink.table.api.java;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index 311986c..be94df9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -144,13 +144,122 @@ class StreamTableEnvironment(
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
     *
+    * NOTE: This method only supports conversion of append-only tables. In order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
+    * @param table The [[Table]] to convert.
+    * @param clazz The class of the type of the resulting [[DataStream]].
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    * @deprecated This method only supports conversion of append-only tables. In order to
+    *            make this more explicit in the future, please use toAppendStream() instead.
+    */
+  @Deprecated
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz)
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * NOTE: This method only supports conversion of append-only tables. In order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
+    * @param table The [[Table]] to convert.
+    * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    * @deprecated This method only supports conversion of append-only tables. In order to
+    *            make this more explicit in the future, please use toAppendStream() instead.
+    */
+  @Deprecated
+  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] =
+    toAppendStream(table, typeInfo)
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * NOTE: This method only supports conversion of append-only tables. In order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
+    * @param table The [[Table]] to convert.
+    * @param clazz The class of the type of the resulting [[DataStream]].
+    * @param queryConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    * @deprecated This method only supports conversion of append-only tables. In order to
+    *            make this more explicit in the future, please use toAppendStream() instead.
+    */
+  @Deprecated
+  def toDataStream[T](
+      table: Table,
+      clazz: Class[T],
+      queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, clazz, queryConfig)
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * NOTE: This method only supports conversion of append-only tables. In order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
+    * @param table The [[Table]] to convert.
+    * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
+    * @param queryConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    * @deprecated This method only supports conversion of append-only tables. In order to
+    *            make this more explicit in the future, please use toAppendStream() instead.
+    */
+  @Deprecated
+  def toDataStream[T](
+      table: Table,
+      typeInfo: TypeInformation[T],
+      queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, typeInfo, queryConfig)
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
     * @param table The [[Table]] to convert.
     * @param clazz The class of the type of the resulting [[DataStream]].
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
-    toDataStream(table, clazz, queryConfig)
+  def toAppendStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
+    toAppendStream(table, clazz, queryConfig)
   }
 
   /**
@@ -169,8 +278,8 @@ class StreamTableEnvironment(
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
-    toDataStream(table, typeInfo, queryConfig)
+  def toAppendStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
+    toAppendStream(table, typeInfo, queryConfig)
   }
 
   /**
@@ -190,7 +299,7 @@ class StreamTableEnvironment(
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T](
+  def toAppendStream[T](
       table: Table,
       clazz: Class[T],
       queryConfig: StreamQueryConfig): DataStream[T] = {
@@ -216,7 +325,7 @@ class StreamTableEnvironment(
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T](
+  def toAppendStream[T](
       table: Table,
       typeInfo: TypeInformation[T],
       queryConfig: StreamQueryConfig): DataStream[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 8c6b273..bfd443a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -138,13 +138,17 @@ class StreamTableEnvironment(
     * types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
     *
+    * NOTE: This method only supports conversion of append-only tables. In order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
     * @param table The [[Table]] to convert.
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
-    toDataStream(table, queryConfig)
-  }
+  @deprecated("This method only supports conversion of append-only tables. In order to make this" +
+    " more explicit in the future, please use toAppendStream() instead.")
+  def toDataStream[T: TypeInformation](table: Table): DataStream[T] = toAppendStream(table)
 
   /**
     * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
@@ -157,13 +161,58 @@ class StreamTableEnvironment(
     * types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
     *
+    * NOTE: This method only supports conversion of append-only tables. In order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
     * @param table The [[Table]] to convert.
     * @param queryConfig The configuration of the query to generate.
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
+  @deprecated("This method only supports conversion of append-only tables. In order to make this" +
+    " more explicit in the future, please use toAppendStream() instead.")
   def toDataStream[T: TypeInformation](
     table: Table,
+    queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, queryConfig)
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = {
+    toAppendStream(table, queryConfig)
+  }
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param queryConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toAppendStream[T: TypeInformation](
+    table: Table,
     queryConfig: StreamQueryConfig): DataStream[T] = {
     val returnType = createTypeInformation[T]
     asScalaStream(translate(

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
index 9874a9e..bd431eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -32,7 +32,17 @@ import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTa
   */
 class TableConversions(table: Table) {
 
-  /** Converts the [[Table]] to a [[DataSet]] of the specified type. */
+  /**
+    * Converts the given [[Table]] into a [[DataSet]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
+    *
+    * @tparam T The type of the resulting [[DataSet]].
+    * @return The converted [[DataSet]].
+    */
   def toDataSet[T: TypeInformation]: DataSet[T] = {
 
     table.tableEnv match {
@@ -44,12 +54,71 @@ class TableConversions(table: Table) {
     }
   }
 
-  /** Converts the [[Table]] to a [[DataStream]] of the specified type. */
-  def toDataStream[T: TypeInformation]: DataStream[T] = {
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * NOTE: This method only supports conversion of append-only tables. In order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  @deprecated("This method only supports conversion of append-only tables. In order to make this" +
+    " more explicit in the future, please use toAppendStream() instead.")
+  def toDataStream[T: TypeInformation]: DataStream[T] = toAppendStream
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * NOTE: This method only supports conversion of append-only tables. In order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
+    * @param queryConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  @deprecated("This method only supports conversion of append-only tables. In order to make this" +
+    " more explicit in the future, please use toAppendStream() instead.")
+  def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] =
+    toAppendStream(queryConfig)
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toAppendStream[T: TypeInformation]: DataStream[T] = {
 
     table.tableEnv match {
       case tEnv: ScalaStreamTableEnv =>
-        tEnv.toDataStream(table)
+        tEnv.toAppendStream(table)
       case _ =>
         throw new TableException(
           "Only tables that originate from Scala DataStreams " +
@@ -57,14 +126,25 @@ class TableConversions(table: Table) {
     }
   }
 
-  /** Converts the [[Table]] to a [[DataStream]] of the specified type.
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
     *
-    * @param queryConfig The configuration for the generated query.
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param queryConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
     */
-  def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = {
+  def toAppendStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = {
     table.tableEnv match {
       case tEnv: ScalaStreamTableEnv =>
-        tEnv.toDataStream(table, queryConfig)
+        tEnv.toAppendStream(table, queryConfig)
       case _ =>
         throw new TableException(
           "Only tables that originate from Scala DataStreams " +

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
index e8a2017..9d15c14 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -87,7 +87,7 @@ package object scala extends ImplicitExpressionConversions {
 
   implicit def table2RowDataStream(table: Table): DataStream[Row] = {
     val tableEnv = table.tableEnv.asInstanceOf[ScalaStreamTableEnv]
-    tableEnv.toDataStream[Row](table)
+    tableEnv.toAppendStream[Row](table)
   }
 
   implicit def tableFunctionCall2Table[T](tf: TableFunction[T]): TableFunctionConversions[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
index 0c0b37e..d827cd6 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -66,7 +66,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 		String sqlQuery = "SELECT a,c FROM MyTableRow";
 		Table result = tableEnv.sql(sqlQuery);
 
-		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
 		resultSet.addSink(new StreamITCase.StringSink());
 		env.execute();
 
@@ -91,7 +91,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 		String sqlQuery = "SELECT * FROM MyTable";
 		Table result = tableEnv.sql(sqlQuery);
 
-		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
 		resultSet.addSink(new StreamITCase.StringSink());
 		env.execute();
 
@@ -115,7 +115,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 		String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4";
 		Table result = tableEnv.sql(sqlQuery);
 
-		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
 		resultSet.addSink(new StreamITCase.StringSink());
 		env.execute();
 
@@ -146,7 +146,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 							"(SELECT a, b, c FROM T2 WHERE a	< 3)";
 		Table result = tableEnv.sql(sqlQuery);
 
-		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
 		resultSet.addSink(new StreamITCase.StringSink());
 		env.execute();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index 9298266..13ec2b4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -46,7 +46,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
 
     tEnv.sql(
       "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
-      .toDataStream[Row]
+      .toAppendStream[Row]
       .addSink(new StreamITCase.StringSink)
 
     env.execute()
@@ -71,7 +71,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
     tEnv.scan("csvTable")
       .where('id > 4)
       .select('last, 'score * 2)
-      .toDataStream[Row]
+      .toAppendStream[Row]
       .addSink(new StreamITCase.StringSink)
 
     env.execute()

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index a7fe1c4..7ba5c16 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -67,7 +67,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
   }
@@ -92,7 +92,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
       "FROM MyTable"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -134,7 +134,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "  MIN(c) OVER (" +
       "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
       "FROM MyTable"
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -177,7 +177,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -204,7 +204,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -234,7 +234,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -259,7 +259,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -321,7 +321,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
       " FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -382,7 +382,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
       "FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -450,7 +450,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
       " FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -511,7 +511,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "  SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
       "FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -572,7 +572,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -638,7 +638,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -700,7 +700,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -761,7 +761,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -836,7 +836,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index ba8c185..bdc1fcc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -57,7 +57,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = ds.toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTableRow", t)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -99,7 +99,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -120,7 +120,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -141,7 +141,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.getSmall3TupleDataStream(env)
     tEnv.registerDataStream("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -165,7 +165,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -192,7 +192,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -218,7 +218,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t2 = StreamTestData.get3TupleDataStream(env)
     tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -243,7 +243,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -275,7 +275,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -305,7 +305,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
index 1114cf0..b355cf0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
@@ -41,7 +41,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     StreamITCase.testResults = mutable.MutableList()
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -60,7 +60,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     StreamITCase.testResults = mutable.MutableList()
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -79,7 +79,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
       .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
       .select('a, 'b)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -99,7 +99,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
       .select('a, 'b, 'c)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -118,7 +118,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     StreamITCase.testResults = mutable.MutableList()
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -134,7 +134,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     StreamITCase.testResults = mutable.MutableList()
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -151,7 +151,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     StreamITCase.testResults = mutable.MutableList()
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -171,7 +171,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter('a === 3)
-    val results = filterDs.toDataStream[Row]
+    val results = filterDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -191,7 +191,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( Literal(false) )
-    val results = filterDs.toDataStream[Row]
+    val results = filterDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -210,7 +210,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( Literal(true) )
-    val results = filterDs.toDataStream[Row]
+    val results = filterDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -233,7 +233,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( 'a % 2 === 0 )
-    val results = filterDs.toDataStream[Row]
+    val results = filterDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -257,7 +257,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( 'a % 2 !== 0)
-    val results = filterDs.toDataStream[Row]
+    val results = filterDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
     val expected = mutable.MutableList(

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
index 81d3577..87a35bf 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -70,7 +70,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
       .select('string, countFun('int), 'int.avg,
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
-    val results = windowedTable.toDataStream[Row](queryConfig)
+    val results = windowedTable.toAppendStream[Row](queryConfig)
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -112,7 +112,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
       .select('string, countFun('int), 'int.avg,
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -138,7 +138,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
       .select(countFun('string), 'int.avg,
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
-    val results = windowedTable.toDataStream[Row](queryConfig)
+    val results = windowedTable.toAppendStream[Row](queryConfig)
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -166,7 +166,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
       .select('string, countFun('string), 'int.avg, weightAvgFun('long, 'int),
               weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -202,7 +202,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
       .groupBy('w, 'int2, 'int3, 'string)
       .select(weightAvgFun('long, 'int))
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
index b097767..f396896 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
@@ -68,7 +68,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .select('c, countFun('b) over 'w as 'mycount, weightAvgFun('a, 'b) over 'w as 'wAvg)
       .select('c, 'mycount, 'wAvg)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -123,7 +123,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
         'b.min over 'w,
         weightAvgFun('b, 'a) over 'w)
 
-    val result = windowedTable.toDataStream[Row]
+    val result = windowedTable.toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -178,7 +178,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     val windowedTable = table
       .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w)
       .select('a, 'c.sum over 'w, 'c.min over 'w)
-    val result = windowedTable.toDataStream[Row]
+    val result = windowedTable.toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -241,7 +241,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
       .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
 
-    val result = windowedTable.toDataStream[Row]
+    val result = windowedTable.toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -304,7 +304,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
         Over partitionBy 'c orderBy 'rowtime preceding 1.seconds following CURRENT_RANGE as 'w)
       .select('c, 'b, 'a.count over 'w, 'a.sum over 'w)
 
-    val result = windowedTable.toDataStream[Row]
+    val result = windowedTable.toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
index f35ee76..2b496e3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
@@ -43,7 +43,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
 
     val unionDs = ds1.unionAll(ds2).select('c)
 
-    val results = unionDs.toDataStream[Row]
+    val results = unionDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -63,7 +63,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
 
     val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
 
-    val results = unionDs.toDataStream[Row]
+    val results = unionDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -82,7 +82,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
 
     val unionDs = ds1.unionAll(ds2)
 
-    val results = unionDs.toDataStream[Row]
+    val results = unionDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -101,7 +101,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
 
     val unionDs = ds1.unionAll(ds2)
 
-    val results = unionDs.toDataStream[Row]
+    val results = unionDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
index 05e1892..3ac664d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
@@ -69,7 +69,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
       .groupBy('w)
       .select('int.count, 'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -104,7 +104,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -141,7 +141,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -175,7 +175,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -204,7 +204,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -232,7 +232,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
     val expected = Seq(

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
index 1d48f2c..12d7202 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
@@ -48,7 +48,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase {
       .where("RichFunc2(c)='ABC#Hello'")
       .select('c)
 
-    val results = result.toDataStream[Row]
+    val results = result.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -71,7 +71,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase {
       .where("RichFunc2(c)='Abc#Hello' || RichFunc1(a)=3 && b=2")
       .select('c)
 
-    val results = result.toDataStream[Row]
+    val results = result.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
index b3d9c6f..9efe6a1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
@@ -53,7 +53,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .join(pojoFunc0('c))
       .where('age > 20)
       .select('c, 'name, 'age)
-      .toDataStream[Row]
+      .toAppendStream[Row]
 
     result.addSink(new StreamITCase.StringSink)
     env.execute()
@@ -70,7 +70,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
     val result = t
       .leftOuterJoin(func0('c) as('d, 'e))
       .select('c, 'd, 'e)
-      .toDataStream[Row]
+      .toAppendStream[Row]
 
     result.addSink(new StreamITCase.StringSink)
     env.execute()
@@ -90,7 +90,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .join(func0('c) as('d, 'e))
       .where(Func18('d, "J"))
       .select('c, 'd, 'e)
-      .toDataStream[Row]
+      .toAppendStream[Row]
 
     result.addSink(new StreamITCase.StringSink)
     env.execute()
@@ -111,7 +111,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .join(tableFunc1('c) as 's)
       .select('a, 's)
 
-    val results = result.toDataStream[Row]
+    val results = result.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -135,7 +135,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .join(tableFunc1(richFunc2('c)) as 's)
       .select('a, 's)
 
-    val results = result.toDataStream[Row]
+    val results = result.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -164,7 +164,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('c, 'd, 'e, 'f, 'g)
       .join(func32('c) as ('h, 'i))
       .select('c, 'd, 'f, 'h, 'e, 'g, 'i)
-      .toDataStream[Row]
+      .toAppendStream[Row]
 
     result.addSink(new StreamITCase.StringSink)
     env.execute()

http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
index 3f12218..f2d6175 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -103,7 +103,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
 
     val t = table.select('rowtime.cast(Types.STRING))
 
-    val results = t.toDataStream[Row]
+    val results = t.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -134,7 +134,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
       .filter('rowtime.cast(Types.LONG) > 4)
       .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY))
 
-    val results = t.toDataStream[Row]
+    val results = t.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -161,7 +161,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
 
     val t = table.join(func('rowtime, 'proctime) as 's).select('rowtime, 's)
 
-    val results = t.toDataStream[Row]
+    val results = t.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -191,7 +191,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
 
     val t = table.unionAll(table).select('rowtime)
 
-    val results = t.toDataStream[Row]
+    val results = t.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -229,7 +229,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     val t = tEnv.sql("SELECT COUNT(`rowtime`) FROM MyTable " +
       "GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)")
 
-    val results = t.toDataStream[Row]
+    val results = t.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -262,7 +262,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
       .groupBy('w2)
       .select('w2.rowtime, 'w2.end, 'int.count)
 
-    val results = t.toDataStream[Row]
+    val results = t.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 


[4/4] flink git commit: [FLINK-6618] [table] Fix translation of WindowProperties in Table API.

Posted by fh...@apache.org.
[FLINK-6618] [table] Fix translation of WindowProperties in Table API.

This closes #3936.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/41aa98a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/41aa98a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/41aa98a2

Branch: refs/heads/master
Commit: 41aa98a2189ea86ba9464fb0348463af822e332d
Parents: c995ebd
Author: sunjincheng121 <su...@gmail.com>
Authored: Thu May 18 13:02:24 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu May 18 21:22:12 2017 +0200

----------------------------------------------------------------------
 .../flink/table/plan/ProjectionTranslator.scala | 20 ++++++++++----------
 .../GroupWindowStringExpressionTest.scala       | 18 +++++++++++-------
 2 files changed, 21 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/41aa98a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
index 802768e..69b437a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
@@ -334,20 +334,20 @@ object ProjectionTranslator {
         val l = replaceAggFunctionCall(b.left, tableEnv)
         val r = replaceAggFunctionCall(b.right, tableEnv)
         b.makeCopy(Array(l, r))
-
       // Functions calls
       case c @ Call(name, args) =>
         val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)
-        if (function.isInstanceOf[AggFunctionCall] || function.isInstanceOf[Aggregation]) {
-          function
-        } else {
-          val newArgs =
-            args.map(
-            (exp: Expression) =>
-              replaceAggFunctionCall(exp, tableEnv))
-          c.makeCopy(Array(name, newArgs))
+        function match {
+          case a: AggFunctionCall => a
+          case a: Aggregation => a
+          case p: AbstractWindowProperty => p
+          case _ =>
+            val newArgs =
+              args.map(
+                (exp: Expression) =>
+                  replaceAggFunctionCall(exp, tableEnv))
+            c.makeCopy(Array(name, newArgs))
         }
-
       // Scala functions
       case sfc @ ScalarFunctionCall(clazz, args) =>
         val newArgs: Seq[Expression] =

http://git-wip-us.apache.org/repos/asf/flink/blob/41aa98a2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
index d261e36..1cc156e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.api.java.{Slide => JSlide}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import org.apache.flink.table.utils.TableTestBase
-import org.junit.{Assert, Test}
+import org.junit.Test
 
 
 class GroupWindowStringExpressionTest extends TableTestBase {
@@ -47,7 +47,9 @@ class GroupWindowStringExpressionTest extends TableTestBase {
         myCountFun('string),
         'int.sum,
         weightAvgFun('long, 'int),
-        weightAvgFun('int, 'int) * 2)
+        weightAvgFun('int, 'int) * 2,
+        'w.start,
+        'w.end)
 
     // String / Java API
     val resJava = t
@@ -55,11 +57,13 @@ class GroupWindowStringExpressionTest extends TableTestBase {
       .groupBy("w, string")
       .select(
         "string, " +
-          "myCountFun(string), " +
-          "int.sum, " +
-          "weightAvgFun(long, int), " +
-          "weightAvgFun(int, int) * 2")
+        "myCountFun(string), " +
+        "int.sum, " +
+        "weightAvgFun(long, int), " +
+        "weightAvgFun(int, int) * 2, " +
+        "start(w)," +
+        "end(w)")
 
-    Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resJava.logicalPlan)
+    verifyTableEquals(resJava, resScala)
   }
 }