You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/18 19:22:56 UTC
[1/4] flink git commit: [FLINK-6614] [table] Fix translation of group
auxiliary functions (e.g., TUMBLE_END).
Repository: flink
Updated Branches:
refs/heads/master c995ebd29 -> 9fc42df68
[FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END).
This closes #3930.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fc42df6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fc42df6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fc42df6
Branch: refs/heads/master
Commit: 9fc42df68d746c633b0d3c8995e0031064bfd362
Parents: 3a65e5a
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed May 17 16:26:27 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu May 18 21:22:12 2017 +0200
----------------------------------------------------------------------
.../common/WindowStartEndPropertiesRule.scala | 39 ++++++++++++++------
.../scala/stream/sql/WindowAggregateTest.scala | 27 ++++++++++++++
2 files changed, 54 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc42df6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
index 7577deb..14e9b21 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -36,13 +36,21 @@ class WindowStartEndPropertiesRule
override def matches(call: RelOptRuleCall): Boolean = {
val project = call.rel(0).asInstanceOf[LogicalProject]
// project includes at least on group auxiliary function
- project.getProjects.exists {
- case c: RexCall => c.getOperator.isGroupAuxiliary
- case _ => false
+
+ def hasGroupAuxiliaries(node: RexNode): Boolean = {
+ node match {
+ case c: RexCall if c.getOperator.isGroupAuxiliary => true
+ case c: RexCall =>
+ c.operands.exists(hasGroupAuxiliaries)
+ case _ => false
+ }
}
+
+ project.getProjects.exists(hasGroupAuxiliaries)
}
override def onMatch(call: RelOptRuleCall): Unit = {
+
val project = call.rel(0).asInstanceOf[LogicalProject]
val innerProject = call.rel(1).asInstanceOf[LogicalProject]
val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
@@ -62,20 +70,27 @@ class WindowStartEndPropertiesRule
transformed.project(
innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end")))
- // replace window auxiliary function by access to window properties
- transformed.project(
- project.getProjects.map{ x =>
- if (WindowStartEndPropertiesRule.isWindowStart(x)) {
+ def replaceGroupAuxiliaries(node: RexNode): RexNode = {
+ node match {
+ case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
// replace expression by access to window start
- rexBuilder.makeCast(x.getType, transformed.field("w$start"), false)
- } else if (WindowStartEndPropertiesRule.isWindowEnd(x)) {
+ rexBuilder.makeCast(c.getType, transformed.field("w$start"), false)
+ case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
// replace expression by access to window end
- rexBuilder.makeCast(x.getType, transformed.field("w$end"), false)
- } else {
+ rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
+ case c: RexCall =>
+ // replace expressions in children
+ val newOps = c.getOperands.map(replaceGroupAuxiliaries)
+ c.clone(c.getType, newOps)
+ case x =>
// preserve expression
x
- }
}
+ }
+
+ // replace window auxiliary function by access to window properties
+ transformed.project(
+ project.getProjects.map(replaceGroupAuxiliaries)
)
val res = transformed.build()
call.transformTo(res)
http://git-wip-us.apache.org/repos/asf/flink/blob/9fc42df6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 2022db8..f95d0ab 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -150,6 +150,33 @@ class WindowAggregateTest extends TableTestBase {
streamUtil.verifySql(sql, expected)
}
+ @Test
+ def testExpressionOnWindowAuxFunction() = {
+ val sql =
+ "SELECT " +
+ " COUNT(*), " +
+ " TUMBLE_END(rowtime, INTERVAL '15' MINUTE) + INTERVAL '1' MINUTE " +
+ "FROM MyTable " +
+ "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "rowtime")
+ ),
+ term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+ term("select", "COUNT(*) AS EXPR$0", "start('w$) AS w$start", "end('w$) AS w$end")
+ ),
+ term("select", "EXPR$0", "DATETIME_PLUS(w$end, 60000) AS $f1")
+ )
+
+ streamUtil.verifySql(sql, expected)
+ }
+
@Test(expected = classOf[TableException])
def testTumbleWindowNoOffset(): Unit = {
val sqlQuery =
[2/4] flink git commit: [FLINK-6585] [table] Fix execution of Table
examples in IDE.
Posted by fh...@apache.org.
[FLINK-6585] [table] Fix execution of Table examples in IDE.
This closes #3905.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5310ed0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5310ed0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5310ed0
Branch: refs/heads/master
Commit: d5310ed02b30ef16f06be6efc52af5e183df26a7
Parents: 41aa98a
Author: twalthr <tw...@apache.org>
Authored: Mon May 15 13:29:50 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu May 18 21:22:12 2017 +0200
----------------------------------------------------------------------
flink-examples/flink-examples-table/pom.xml | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d5310ed0/flink-examples/flink-examples-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/pom.xml b/flink-examples/flink-examples-table/pom.xml
index e59d8c6..8574650 100644
--- a/flink-examples/flink-examples-table/pom.xml
+++ b/flink-examples/flink-examples-table/pom.xml
@@ -39,14 +39,14 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
- <scope>provided</scope>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${project.version}</version>
- <scope>provided</scope>
+ <scope>compile</scope>
</dependency>
</dependencies>
[3/4] flink git commit: [FLINK-6543] [table] Deprecate toDataStream
and add toAppendStream.
Posted by fh...@apache.org.
[FLINK-6543] [table] Deprecate toDataStream and add toAppendStream.
This closes #3929.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a65e5ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a65e5ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a65e5ac
Branch: refs/heads/master
Commit: 3a65e5acbcc29636b0ce1631815861089fc21dca
Parents: d5310ed
Author: twalthr <tw...@apache.org>
Authored: Wed May 17 11:31:33 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu May 18 21:22:12 2017 +0200
----------------------------------------------------------------------
.../table/examples/scala/StreamSQLExample.scala | 2 +-
.../examples/scala/StreamTableExample.scala | 2 +-
.../flink/table/api/java/package-info.java | 5 +-
.../table/api/java/StreamTableEnvironment.scala | 121 ++++++++++++++++++-
.../api/scala/StreamTableEnvironment.scala | 55 ++++++++-
.../table/api/scala/TableConversions.scala | 96 +++++++++++++--
.../apache/flink/table/api/scala/package.scala | 2 +-
.../table/api/java/stream/sql/SqlITCase.java | 8 +-
.../api/scala/stream/TableSourceITCase.scala | 4 +-
.../api/scala/stream/sql/OverWindowITCase.scala | 32 ++---
.../table/api/scala/stream/sql/SqlITCase.scala | 20 +--
.../api/scala/stream/table/CalcITCase.scala | 24 ++--
.../table/GroupWindowAggregationsITCase.scala | 10 +-
.../scala/stream/table/OverWindowITCase.scala | 10 +-
.../api/scala/stream/table/UnionITCase.scala | 8 +-
.../datastream/DataStreamAggregateITCase.scala | 12 +-
.../datastream/DataStreamCalcITCase.scala | 4 +-
.../DataStreamUserDefinedFunctionITCase.scala | 12 +-
.../datastream/TimeAttributesITCase.scala | 12 +-
19 files changed, 339 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
index 2cdd8b8..665913e 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
@@ -62,7 +62,7 @@ object StreamSQLExample {
"SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
"SELECT * FROM OrderB WHERE amount < 2")
- result.toDataStream[Order].print()
+ result.toAppendStream[Order].print()
env.execute()
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
index 5c5012b..1c0ffea 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
@@ -55,7 +55,7 @@ object StreamTableExample {
val result: DataStream[Order] = orderA.unionAll(orderB)
.select('user, 'product, 'amount)
.where('amount > 2)
- .toDataStream[Order]
+ .toAppendStream[Order]
result.print()
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
index 2409872..3dbf50f 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
@@ -57,8 +57,9 @@
* <p>
* As seen above, a {@link org.apache.flink.table.api.Table} can be converted back to the
* underlying API representation using
- * {@link org.apache.flink.table.api.java.BatchTableEnvironment#toDataSet(Table, java.lang.Class)}
- * or {@link org.apache.flink.table.api.java.StreamTableEnvironment#toDataStream(Table, java.lang.Class)}}.
+ * {@link org.apache.flink.table.api.java.BatchTableEnvironment#toDataSet(Table, java.lang.Class)},
+ * {@link org.apache.flink.table.api.java.StreamTableEnvironment#toAppendStream(Table, java.lang.Class)}}, or
+ * {@link org.apache.flink.table.api.java.StreamTableEnvironment#toRetractStream(Table, java.lang.Class)}}.
*/
package org.apache.flink.table.api.java;
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index 311986c..be94df9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -144,13 +144,122 @@ class StreamTableEnvironment(
* types: Fields are mapped by position, field types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
+ * NOTE: This method only supports conversion of append-only tables. In order to make this
+ * more explicit in the future, please use [[toAppendStream()]] instead.
+ * If add and retract messages are required, use [[toRetractStream()]].
+ *
+ * @param table The [[Table]] to convert.
+ * @param clazz The class of the type of the resulting [[DataStream]].
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ * @deprecated This method only supports conversion of append-only tables. In order to
+ * make this more explicit in the future, please use toAppendStream() instead.
+ */
+ @Deprecated
+ def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz)
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * NOTE: This method only supports conversion of append-only tables. In order to make this
+ * more explicit in the future, please use [[toAppendStream()]] instead.
+ * If add and retract messages are required, use [[toRetractStream()]].
+ *
+ * @param table The [[Table]] to convert.
+ * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ * @deprecated This method only supports conversion of append-only tables. In order to
+ * make this more explicit in the future, please use toAppendStream() instead.
+ */
+ @Deprecated
+ def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] =
+ toAppendStream(table, typeInfo)
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * NOTE: This method only supports conversion of append-only tables. In order to make this
+ * more explicit in the future, please use [[toAppendStream()]] instead.
+ * If add and retract messages are required, use [[toRetractStream()]].
+ *
+ * @param table The [[Table]] to convert.
+ * @param clazz The class of the type of the resulting [[DataStream]].
+ * @param queryConfig The configuration of the query to generate.
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ * @deprecated This method only supports conversion of append-only tables. In order to
+ * make this more explicit in the future, please use toAppendStream() instead.
+ */
+ @Deprecated
+ def toDataStream[T](
+ table: Table,
+ clazz: Class[T],
+ queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, clazz, queryConfig)
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * NOTE: This method only supports conversion of append-only tables. In order to make this
+ * more explicit in the future, please use [[toAppendStream()]] instead.
+ * If add and retract messages are required, use [[toRetractStream()]].
+ *
+ * @param table The [[Table]] to convert.
+ * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
+ * @param queryConfig The configuration of the query to generate.
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ * @deprecated This method only supports conversion of append-only tables. In order to
+ * make this more explicit in the future, please use toAppendStream() instead.
+ */
+ @Deprecated
+ def toDataStream[T](
+ table: Table,
+ typeInfo: TypeInformation[T],
+ queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, typeInfo, queryConfig)
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
* @param table The [[Table]] to convert.
* @param clazz The class of the type of the resulting [[DataStream]].
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
- def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
- toDataStream(table, clazz, queryConfig)
+ def toAppendStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
+ toAppendStream(table, clazz, queryConfig)
}
/**
@@ -169,8 +278,8 @@ class StreamTableEnvironment(
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
- def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
- toDataStream(table, typeInfo, queryConfig)
+ def toAppendStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
+ toAppendStream(table, typeInfo, queryConfig)
}
/**
@@ -190,7 +299,7 @@ class StreamTableEnvironment(
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
- def toDataStream[T](
+ def toAppendStream[T](
table: Table,
clazz: Class[T],
queryConfig: StreamQueryConfig): DataStream[T] = {
@@ -216,7 +325,7 @@ class StreamTableEnvironment(
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
- def toDataStream[T](
+ def toAppendStream[T](
table: Table,
typeInfo: TypeInformation[T],
queryConfig: StreamQueryConfig): DataStream[T] = {
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 8c6b273..bfd443a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -138,13 +138,17 @@ class StreamTableEnvironment(
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
+ * NOTE: This method only supports conversion of append-only tables. In order to make this
+ * more explicit in the future, please use [[toAppendStream()]] instead.
+ * If add and retract messages are required, use [[toRetractStream()]].
+ *
* @param table The [[Table]] to convert.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
- def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
- toDataStream(table, queryConfig)
- }
+ @deprecated("This method only supports conversion of append-only tables. In order to make this" +
+ " more explicit in the future, please use toAppendStream() instead.")
+ def toDataStream[T: TypeInformation](table: Table): DataStream[T] = toAppendStream(table)
/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
@@ -157,13 +161,58 @@ class StreamTableEnvironment(
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
+ * NOTE: This method only supports conversion of append-only tables. In order to make this
+ * more explicit in the future, please use [[toAppendStream()]] instead.
+ * If add and retract messages are required, use [[toRetractStream()]].
+ *
* @param table The [[Table]] to convert.
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
+ @deprecated("This method only supports conversion of append-only tables. In order to make this" +
+ " more explicit in the future, please use toAppendStream() instead.")
def toDataStream[T: TypeInformation](
table: Table,
+ queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, queryConfig)
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+ * types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = {
+ toAppendStream(table, queryConfig)
+ }
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+ * types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @param queryConfig The configuration of the query to generate.
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ def toAppendStream[T: TypeInformation](
+ table: Table,
queryConfig: StreamQueryConfig): DataStream[T] = {
val returnType = createTypeInformation[T]
asScalaStream(translate(
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
index 9874a9e..bd431eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -32,7 +32,17 @@ import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTa
*/
class TableConversions(table: Table) {
- /** Converts the [[Table]] to a [[DataSet]] of the specified type. */
+ /**
+ * Converts the given [[Table]] into a [[DataSet]] of a specified type.
+ *
+ * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
+ *
+ * @tparam T The type of the resulting [[DataSet]].
+ * @return The converted [[DataSet]].
+ */
def toDataSet[T: TypeInformation]: DataSet[T] = {
table.tableEnv match {
@@ -44,12 +54,71 @@ class TableConversions(table: Table) {
}
}
- /** Converts the [[Table]] to a [[DataStream]] of the specified type. */
- def toDataStream[T: TypeInformation]: DataStream[T] = {
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+ * types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * NOTE: This method only supports conversion of append-only tables. In order to make this
+ * more explicit in the future, please use [[toAppendStream()]] instead.
+ * If add and retract messages are required, use [[toRetractStream()]].
+ *
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ @deprecated("This method only supports conversion of append-only tables. In order to make this" +
+ " more explicit in the future, please use toAppendStream() instead.")
+ def toDataStream[T: TypeInformation]: DataStream[T] = toAppendStream
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+ * types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * NOTE: This method only supports conversion of append-only tables. In order to make this
+ * more explicit in the future, please use [[toAppendStream()]] instead.
+ * If add and retract messages are required, use [[toRetractStream()]].
+ *
+ * @param queryConfig The configuration of the query to generate.
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ @deprecated("This method only supports conversion of append-only tables. In order to make this" +
+ " more explicit in the future, please use toAppendStream() instead.")
+ def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] =
+ toAppendStream(queryConfig)
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+ * types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ def toAppendStream[T: TypeInformation]: DataStream[T] = {
table.tableEnv match {
case tEnv: ScalaStreamTableEnv =>
- tEnv.toDataStream(table)
+ tEnv.toAppendStream(table)
case _ =>
throw new TableException(
"Only tables that originate from Scala DataStreams " +
@@ -57,14 +126,25 @@ class TableConversions(table: Table) {
}
}
- /** Converts the [[Table]] to a [[DataStream]] of the specified type.
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
- * @param queryConfig The configuration for the generated query.
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+ * types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param queryConfig The configuration of the query to generate.
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
*/
- def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = {
+ def toAppendStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = {
table.tableEnv match {
case tEnv: ScalaStreamTableEnv =>
- tEnv.toDataStream(table, queryConfig)
+ tEnv.toAppendStream(table, queryConfig)
case _ =>
throw new TableException(
"Only tables that originate from Scala DataStreams " +
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
index e8a2017..9d15c14 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -87,7 +87,7 @@ package object scala extends ImplicitExpressionConversions {
implicit def table2RowDataStream(table: Table): DataStream[Row] = {
val tableEnv = table.tableEnv.asInstanceOf[ScalaStreamTableEnv]
- tableEnv.toDataStream[Row](table)
+ tableEnv.toAppendStream[Row](table)
}
implicit def tableFunctionCall2Table[T](tf: TableFunction[T]): TableFunctionConversions[T] = {
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
index 0c0b37e..d827cd6 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -66,7 +66,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
String sqlQuery = "SELECT a,c FROM MyTableRow";
Table result = tableEnv.sql(sqlQuery);
- DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+ DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink());
env.execute();
@@ -91,7 +91,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
String sqlQuery = "SELECT * FROM MyTable";
Table result = tableEnv.sql(sqlQuery);
- DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+ DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink());
env.execute();
@@ -115,7 +115,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4";
Table result = tableEnv.sql(sqlQuery);
- DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+ DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink());
env.execute();
@@ -146,7 +146,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
"(SELECT a, b, c FROM T2 WHERE a < 3)";
Table result = tableEnv.sql(sqlQuery);
- DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+ DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink());
env.execute();
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index 9298266..13ec2b4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -46,7 +46,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
tEnv.sql(
"SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
- .toDataStream[Row]
+ .toAppendStream[Row]
.addSink(new StreamITCase.StringSink)
env.execute()
@@ -71,7 +71,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
tEnv.scan("csvTable")
.where('id > 4)
.select('last, 'score * 2)
- .toDataStream[Row]
+ .toAppendStream[Row]
.addSink(new StreamITCase.StringSink)
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index a7fe1c4..7ba5c16 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -67,7 +67,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
"from T1"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
}
@@ -92,7 +92,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
"FROM MyTable"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -134,7 +134,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" MIN(c) OVER (" +
" ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
"FROM MyTable"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -177,7 +177,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
"from T1"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -204,7 +204,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
"from T1"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -234,7 +234,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
"from T1"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -259,7 +259,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
"from T1"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -321,7 +321,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
" FROM T1"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -382,7 +382,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
"FROM T1"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -450,7 +450,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
" FROM T1"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -511,7 +511,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
"FROM T1"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -572,7 +572,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -638,7 +638,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -700,7 +700,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -761,7 +761,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -836,7 +836,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index ba8c185..bdc1fcc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -57,7 +57,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = ds.toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTableRow", t)
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -99,7 +99,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -120,7 +120,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -141,7 +141,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t = StreamTestData.getSmall3TupleDataStream(env)
tEnv.registerDataStream("MyTable", t)
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -165,7 +165,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("T2", t2)
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -192,7 +192,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("T2", t2)
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -218,7 +218,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val t2 = StreamTestData.get3TupleDataStream(env)
tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -243,7 +243,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -275,7 +275,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -305,7 +305,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
index 1114cf0..b355cf0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
@@ -41,7 +41,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
StreamITCase.testResults = mutable.MutableList()
val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
- val results = ds.toDataStream[Row]
+ val results = ds.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -60,7 +60,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
StreamITCase.testResults = mutable.MutableList()
val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
- val results = ds.toDataStream[Row]
+ val results = ds.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -79,7 +79,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
.select('_1 as 'a, '_2 as 'b, '_1 as 'c)
.select('a, 'b)
- val results = ds.toDataStream[Row]
+ val results = ds.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -99,7 +99,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
.select('a, 'b, 'c)
- val results = ds.toDataStream[Row]
+ val results = ds.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -118,7 +118,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
StreamITCase.testResults = mutable.MutableList()
val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
- val results = ds.toDataStream[Row]
+ val results = ds.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -134,7 +134,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
StreamITCase.testResults = mutable.MutableList()
val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
- val results = ds.toDataStream[Row]
+ val results = ds.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -151,7 +151,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
StreamITCase.testResults = mutable.MutableList()
val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
- val results = ds.toDataStream[Row]
+ val results = ds.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -171,7 +171,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter('a === 3)
- val results = filterDs.toDataStream[Row]
+ val results = filterDs.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -191,7 +191,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( Literal(false) )
- val results = filterDs.toDataStream[Row]
+ val results = filterDs.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -210,7 +210,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( Literal(true) )
- val results = filterDs.toDataStream[Row]
+ val results = filterDs.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -233,7 +233,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( 'a % 2 === 0 )
- val results = filterDs.toDataStream[Row]
+ val results = filterDs.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -257,7 +257,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
val filterDs = ds.filter( 'a % 2 !== 0)
- val results = filterDs.toDataStream[Row]
+ val results = filterDs.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
val expected = mutable.MutableList(
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
index 81d3577..87a35bf 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -70,7 +70,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
.select('string, countFun('int), 'int.avg,
weightAvgFun('long, 'int), weightAvgFun('int, 'int))
- val results = windowedTable.toDataStream[Row](queryConfig)
+ val results = windowedTable.toAppendStream[Row](queryConfig)
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -112,7 +112,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
.select('string, countFun('int), 'int.avg,
weightAvgFun('long, 'int), weightAvgFun('int, 'int))
- val results = windowedTable.toDataStream[Row]
+ val results = windowedTable.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -138,7 +138,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
.select(countFun('string), 'int.avg,
weightAvgFun('long, 'int), weightAvgFun('int, 'int))
- val results = windowedTable.toDataStream[Row](queryConfig)
+ val results = windowedTable.toAppendStream[Row](queryConfig)
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -166,7 +166,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
.select('string, countFun('string), 'int.avg, weightAvgFun('long, 'int),
weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
- val results = windowedTable.toDataStream[Row]
+ val results = windowedTable.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -202,7 +202,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
.groupBy('w, 'int2, 'int3, 'string)
.select(weightAvgFun('long, 'int))
- val results = windowedTable.toDataStream[Row]
+ val results = windowedTable.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
index b097767..f396896 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
@@ -68,7 +68,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
.select('c, countFun('b) over 'w as 'mycount, weightAvgFun('a, 'b) over 'w as 'wAvg)
.select('c, 'mycount, 'wAvg)
- val results = windowedTable.toDataStream[Row]
+ val results = windowedTable.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -123,7 +123,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
'b.min over 'w,
weightAvgFun('b, 'a) over 'w)
- val result = windowedTable.toDataStream[Row]
+ val result = windowedTable.toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -178,7 +178,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
val windowedTable = table
.window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w)
.select('a, 'c.sum over 'w, 'c.min over 'w)
- val result = windowedTable.toDataStream[Row]
+ val result = windowedTable.toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -241,7 +241,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
.window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
.select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
- val result = windowedTable.toDataStream[Row]
+ val result = windowedTable.toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -304,7 +304,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
Over partitionBy 'c orderBy 'rowtime preceding 1.seconds following CURRENT_RANGE as 'w)
.select('c, 'b, 'a.count over 'w, 'a.sum over 'w)
- val result = windowedTable.toDataStream[Row]
+ val result = windowedTable.toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
index f35ee76..2b496e3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
@@ -43,7 +43,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
val unionDs = ds1.unionAll(ds2).select('c)
- val results = unionDs.toDataStream[Row]
+ val results = unionDs.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -63,7 +63,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
- val results = unionDs.toDataStream[Row]
+ val results = unionDs.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -82,7 +82,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
val unionDs = ds1.unionAll(ds2)
- val results = unionDs.toDataStream[Row]
+ val results = unionDs.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -101,7 +101,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
val unionDs = ds1.unionAll(ds2)
- val results = unionDs.toDataStream[Row]
+ val results = unionDs.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
index 05e1892..3ac664d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
@@ -69,7 +69,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
.groupBy('w)
.select('int.count, 'w.start, 'w.end)
- val results = windowedTable.toDataStream[Row]
+ val results = windowedTable.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -104,7 +104,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
.groupBy('w, 'string)
.select('string, 'int.count, 'w.start, 'w.end)
- val results = windowedTable.toDataStream[Row]
+ val results = windowedTable.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -141,7 +141,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
.groupBy('w, 'string)
.select('string, 'int.count, 'w.start, 'w.end)
- val results = windowedTable.toDataStream[Row]
+ val results = windowedTable.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -175,7 +175,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
.groupBy('w, 'string)
.select('string, 'int.count, 'w.start, 'w.end)
- val results = windowedTable.toDataStream[Row]
+ val results = windowedTable.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -204,7 +204,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
.groupBy('w, 'string)
.select('string, 'int.count, 'w.start, 'w.end)
- val results = windowedTable.toDataStream[Row]
+ val results = windowedTable.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -232,7 +232,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
.groupBy('w, 'string)
.select('string, 'int.count, 'w.start, 'w.end)
- val results = windowedTable.toDataStream[Row]
+ val results = windowedTable.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
val expected = Seq(
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
index 1d48f2c..12d7202 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
@@ -48,7 +48,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase {
.where("RichFunc2(c)='ABC#Hello'")
.select('c)
- val results = result.toDataStream[Row]
+ val results = result.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -71,7 +71,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase {
.where("RichFunc2(c)='Abc#Hello' || RichFunc1(a)=3 && b=2")
.select('c)
- val results = result.toDataStream[Row]
+ val results = result.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
index b3d9c6f..9efe6a1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
@@ -53,7 +53,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
.join(pojoFunc0('c))
.where('age > 20)
.select('c, 'name, 'age)
- .toDataStream[Row]
+ .toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -70,7 +70,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
val result = t
.leftOuterJoin(func0('c) as('d, 'e))
.select('c, 'd, 'e)
- .toDataStream[Row]
+ .toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -90,7 +90,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
.join(func0('c) as('d, 'e))
.where(Func18('d, "J"))
.select('c, 'd, 'e)
- .toDataStream[Row]
+ .toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
@@ -111,7 +111,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
.join(tableFunc1('c) as 's)
.select('a, 's)
- val results = result.toDataStream[Row]
+ val results = result.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -135,7 +135,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
.join(tableFunc1(richFunc2('c)) as 's)
.select('a, 's)
- val results = result.toDataStream[Row]
+ val results = result.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -164,7 +164,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
.select('c, 'd, 'e, 'f, 'g)
.join(func32('c) as ('h, 'i))
.select('c, 'd, 'f, 'h, 'e, 'g, 'i)
- .toDataStream[Row]
+ .toAppendStream[Row]
result.addSink(new StreamITCase.StringSink)
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
index 3f12218..f2d6175 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -103,7 +103,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
val t = table.select('rowtime.cast(Types.STRING))
- val results = t.toDataStream[Row]
+ val results = t.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -134,7 +134,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
.filter('rowtime.cast(Types.LONG) > 4)
.select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY))
- val results = t.toDataStream[Row]
+ val results = t.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -161,7 +161,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
val t = table.join(func('rowtime, 'proctime) as 's).select('rowtime, 's)
- val results = t.toDataStream[Row]
+ val results = t.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -191,7 +191,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
val t = table.unionAll(table).select('rowtime)
- val results = t.toDataStream[Row]
+ val results = t.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -229,7 +229,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
val t = tEnv.sql("SELECT COUNT(`rowtime`) FROM MyTable " +
"GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)")
- val results = t.toDataStream[Row]
+ val results = t.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -262,7 +262,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
.groupBy('w2)
.select('w2.rowtime, 'w2.end, 'int.count)
- val results = t.toDataStream[Row]
+ val results = t.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
[4/4] flink git commit: [FLINK-6618] [table] Fix translation of
WindowProperties in Table API.
Posted by fh...@apache.org.
[FLINK-6618] [table] Fix translation of WindowProperties in Table API.
This closes #3936.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/41aa98a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/41aa98a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/41aa98a2
Branch: refs/heads/master
Commit: 41aa98a2189ea86ba9464fb0348463af822e332d
Parents: c995ebd
Author: sunjincheng121 <su...@gmail.com>
Authored: Thu May 18 13:02:24 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu May 18 21:22:12 2017 +0200
----------------------------------------------------------------------
.../flink/table/plan/ProjectionTranslator.scala | 20 ++++++++++----------
.../GroupWindowStringExpressionTest.scala | 18 +++++++++++-------
2 files changed, 21 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/41aa98a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
index 802768e..69b437a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
@@ -334,20 +334,20 @@ object ProjectionTranslator {
val l = replaceAggFunctionCall(b.left, tableEnv)
val r = replaceAggFunctionCall(b.right, tableEnv)
b.makeCopy(Array(l, r))
-
// Functions calls
case c @ Call(name, args) =>
val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)
- if (function.isInstanceOf[AggFunctionCall] || function.isInstanceOf[Aggregation]) {
- function
- } else {
- val newArgs =
- args.map(
- (exp: Expression) =>
- replaceAggFunctionCall(exp, tableEnv))
- c.makeCopy(Array(name, newArgs))
+ function match {
+ case a: AggFunctionCall => a
+ case a: Aggregation => a
+ case p: AbstractWindowProperty => p
+ case _ =>
+ val newArgs =
+ args.map(
+ (exp: Expression) =>
+ replaceAggFunctionCall(exp, tableEnv))
+ c.makeCopy(Array(name, newArgs))
}
-
// Scala functions
case sfc @ ScalarFunctionCall(clazz, args) =>
val newArgs: Seq[Expression] =
http://git-wip-us.apache.org/repos/asf/flink/blob/41aa98a2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
index d261e36..1cc156e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.api.java.{Slide => JSlide}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.aggfunctions.CountAggFunction
import org.apache.flink.table.utils.TableTestBase
-import org.junit.{Assert, Test}
+import org.junit.Test
class GroupWindowStringExpressionTest extends TableTestBase {
@@ -47,7 +47,9 @@ class GroupWindowStringExpressionTest extends TableTestBase {
myCountFun('string),
'int.sum,
weightAvgFun('long, 'int),
- weightAvgFun('int, 'int) * 2)
+ weightAvgFun('int, 'int) * 2,
+ 'w.start,
+ 'w.end)
// String / Java API
val resJava = t
@@ -55,11 +57,13 @@ class GroupWindowStringExpressionTest extends TableTestBase {
.groupBy("w, string")
.select(
"string, " +
- "myCountFun(string), " +
- "int.sum, " +
- "weightAvgFun(long, int), " +
- "weightAvgFun(int, int) * 2")
+ "myCountFun(string), " +
+ "int.sum, " +
+ "weightAvgFun(long, int), " +
+ "weightAvgFun(int, int) * 2, " +
+ "start(w)," +
+ "end(w)")
- Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resJava.logicalPlan)
+ verifyTableEquals(resJava, resScala)
}
}