You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/19 20:41:21 UTC
[1/3] flink git commit: [FLINK-6886] [table] Fix conversion of Row
Table to POJO.
Repository: flink
Updated Branches:
refs/heads/master 06e63386e -> 6cf6cb8dd
[FLINK-6886] [table] Fix conversion of Row Table to POJO.
This closes #4102.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d78eeca3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d78eeca3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d78eeca3
Branch: refs/heads/master
Commit: d78eeca37554ac75faf1aa451d0b4107ebd96fb9
Parents: 06e6338
Author: sunjincheng121 <su...@gmail.com>
Authored: Sat Jun 17 03:25:08 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jun 19 22:41:24 2017 +0200
----------------------------------------------------------------------
.../table/api/StreamTableEnvironment.scala | 22 ++++++++--
.../table/api/java/stream/sql/SqlITCase.java | 8 ++--
.../flink/table/api/java/utils/Pojos.java | 45 ++++++++++++++++++++
.../api/scala/stream/TableSourceITCase.scala | 6 +--
.../api/scala/stream/sql/OverWindowITCase.scala | 32 +++++++-------
.../table/api/scala/stream/sql/SqlITCase.scala | 20 ++++-----
.../api/scala/stream/table/CalcITCase.scala | 24 +++++------
.../table/GroupWindowAggregationsITCase.scala | 10 ++---
.../scala/stream/table/OverWindowITCase.scala | 10 ++---
.../api/scala/stream/table/UnionITCase.scala | 8 ++--
.../api/scala/stream/utils/StreamITCase.scala | 4 +-
.../datastream/DataStreamAggregateITCase.scala | 12 +++---
.../datastream/DataStreamCalcITCase.scala | 4 +-
.../DataStreamUserDefinedFunctionITCase.scala | 16 +++----
.../datastream/TimeAttributesITCase.scala | 45 +++++++++++++++++---
15 files changed, 179 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index bc5038d..178bd9f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -20,10 +20,11 @@ package org.apache.flink.table.api
import _root_.java.lang.{Boolean => JBool}
import _root_.java.util.concurrent.atomic.AtomicInteger
+import _root_.java.util.{List => JList}
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.plan.hep.HepMatchOrder
-import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelDataTypeFieldImpl, RelRecordType}
import org.apache.calcite.rel.{RelNode, RelVisitor}
import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
import org.apache.calcite.sql.SqlKind
@@ -38,7 +39,7 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.calcite.RelTimeIndicatorConverter
+import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference}
import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -604,7 +605,22 @@ abstract class StreamTableEnvironment(
withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = {
val relNode = table.getRelNode
val dataStreamPlan = optimize(relNode, updatesAsRetraction)
- translate(dataStreamPlan, relNode.getRowType, queryConfig, withChangeFlag)
+
+ // zip original field names with optimized field types
+ val fieldTypes = relNode.getRowType.getFieldList.asScala
+ .zip(dataStreamPlan.getRowType.getFieldList.asScala)
+ // get name of original plan and type of optimized plan
+ .map(x => (x._1.getName, x._2.getType))
+ // add field indexes
+ .zipWithIndex
+ // build new field types
+ .map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2))
+
+ // build a record type from list of field types
+ val rowType = new RelRecordType(
+ fieldTypes.toList.asJava.asInstanceOf[JList[RelDataTypeField]])
+
+ translate(dataStreamPlan, rowType, queryConfig, withChangeFlag)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
index 9270221..4f32382 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -71,7 +71,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
Table result = tableEnv.sql(sqlQuery);
DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
- resultSet.addSink(new StreamITCase.StringSink());
+ resultSet.addSink(new StreamITCase.StringSink<Row>());
env.execute();
List<String> expected = new ArrayList<>();
@@ -96,7 +96,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
Table result = tableEnv.sql(sqlQuery);
DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
- resultSet.addSink(new StreamITCase.StringSink());
+ resultSet.addSink(new StreamITCase.StringSink<Row>());
env.execute();
List<String> expected = new ArrayList<>();
@@ -120,7 +120,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
Table result = tableEnv.sql(sqlQuery);
DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
- resultSet.addSink(new StreamITCase.StringSink());
+ resultSet.addSink(new StreamITCase.StringSink<Row>());
env.execute();
List<String> expected = new ArrayList<>();
@@ -151,7 +151,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
Table result = tableEnv.sql(sqlQuery);
DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
- resultSet.addSink(new StreamITCase.StringSink());
+ resultSet.addSink(new StreamITCase.StringSink<Row>());
env.execute();
List<String> expected = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
new file mode 100644
index 0000000..3048835
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.utils;
+
+import java.io.Serializable;
+import java.sql.Timestamp;
+
+/**
+ * POJOs for table api testing.
+ */
+public class Pojos {
+
+ /**
+ * Pojo1 for test.
+ */
+ public static class Pojo1 implements Serializable {
+
+ public Timestamp ts;
+ public String msg;
+
+ @Override
+ public String toString() {
+ return "Pojo1{" +
+ "ts=" + ts +
+ ", msg='" + msg + '\'' +
+ '}';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index 13ec2b4..a7037bb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -47,7 +47,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
tEnv.sql(
"SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
.toAppendStream[Row]
- .addSink(new StreamITCase.StringSink)
+ .addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -72,7 +72,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
.where('id > 4)
.select('last, 'score * 2)
.toAppendStream[Row]
- .addSink(new StreamITCase.StringSink)
+ .addSink(new StreamITCase.StringSink[Row])
env.execute()
@@ -94,7 +94,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
tEnv.scan(tableName)
.where("amount > 4 && price < 9")
.select("id, name")
- .addSink(new StreamITCase.StringSink)
+ .addSink(new StreamITCase.StringSink[Row])
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index 36eff1e..7a24f50 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -68,7 +68,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"from T1"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
}
@@ -93,7 +93,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"FROM MyTable"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -135,7 +135,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
"FROM MyTable"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -178,7 +178,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"from T1"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -206,7 +206,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"as cnt1 from T1)"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -236,7 +236,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"from T1"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -261,7 +261,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"from T1"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
@@ -323,7 +323,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" FROM T1"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -384,7 +384,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"FROM T1"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -452,7 +452,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
" FROM T1"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -513,7 +513,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"FROM T1"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -574,7 +574,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -640,7 +640,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -702,7 +702,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -763,7 +763,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -835,7 +835,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T1", t1)
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index bdc1fcc..55633ff 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -58,7 +58,7 @@ class SqlITCase extends StreamingWithStateTestBase {
tEnv.registerTable("MyTableRow", t)
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List("Hello,Worlds,1","Hello again,Worlds,2")
@@ -100,7 +100,7 @@ class SqlITCase extends StreamingWithStateTestBase {
tEnv.registerTable("MyTable", t)
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List("2,0", "4,1", "6,1")
@@ -121,7 +121,7 @@ class SqlITCase extends StreamingWithStateTestBase {
tEnv.registerTable("MyTable", t)
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List("3,2,Hello world")
@@ -142,7 +142,7 @@ class SqlITCase extends StreamingWithStateTestBase {
tEnv.registerDataStream("MyTable", t)
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List("3,2,Hello world")
@@ -166,7 +166,7 @@ class SqlITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T2", t2)
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -193,7 +193,7 @@ class SqlITCase extends StreamingWithStateTestBase {
tEnv.registerTable("T2", t2)
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -219,7 +219,7 @@ class SqlITCase extends StreamingWithStateTestBase {
tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List("Hello", "Hello world")
@@ -244,7 +244,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -276,7 +276,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
@@ -306,7 +306,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
index b355cf0..adf4d44 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
@@ -42,7 +42,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -61,7 +61,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList("1", "2", "3")
@@ -80,7 +80,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
.select('a, 'b)
val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -100,7 +100,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
.select('a, 'b, 'c)
val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -119,7 +119,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList("no")
@@ -135,7 +135,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList("no")
@@ -152,7 +152,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList("no")
@@ -172,7 +172,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val filterDs = ds.filter('a === 3)
val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList("3,2,Hello world")
@@ -192,7 +192,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val filterDs = ds.filter( Literal(false) )
val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
assertEquals(true, StreamITCase.testResults.isEmpty)
@@ -211,7 +211,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val filterDs = ds.filter( Literal(true) )
val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -234,7 +234,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val filterDs = ds.filter( 'a % 2 === 0 )
val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -258,7 +258,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
val filterDs = ds.filter( 'a % 2 !== 0)
val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
"1,1,Hi", "3,2,Hello world",
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
index 87a35bf..d78aea6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -71,7 +71,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
weightAvgFun('long, 'int), weightAvgFun('int, 'int))
val results = windowedTable.toAppendStream[Row](queryConfig)
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq("Hello world,1,3,8,3", "Hello world,2,3,12,3", "Hello,1,2,2,2",
@@ -113,7 +113,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
weightAvgFun('long, 'int), weightAvgFun('int, 'int))
val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq("Hello World,1,9,9,9", "Hello,1,16,16,16", "Hello,4,3,5,5")
@@ -139,7 +139,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
weightAvgFun('long, 'int), weightAvgFun('int, 'int))
val results = windowedTable.toAppendStream[Row](queryConfig)
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq("2,1,1,1", "2,2,6,2")
@@ -167,7 +167,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -203,7 +203,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
.select(weightAvgFun('long, 'int))
val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq("12", "8", "2", "3", "1")
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
index 133328e..6f9aad2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
@@ -70,7 +70,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
.select('c, 'mycount, 'wAvg)
val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -131,7 +131,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
weightAvgFun('b, 'a) over 'w)
val result = windowedTable.toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -186,7 +186,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
.window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w)
.select('a, 'c.sum over 'w, 'c.min over 'w)
val result = windowedTable.toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -249,7 +249,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
.select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
val result = windowedTable.toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -312,7 +312,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
.select('c, 'b, 'a.count over 'w, 'a.sum over 'w)
val result = windowedTable.toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
index e30540d..3d1704e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
@@ -44,7 +44,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
val unionDs = ds1.unionAll(ds2).select('c)
val results = unionDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -64,7 +64,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
val results = unionDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList("Hi", "Hallo")
@@ -83,7 +83,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
val unionDs = ds1.unionAll(ds2)
val results = unionDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
assertEquals(true, StreamITCase.testResults.isEmpty)
@@ -102,7 +102,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
val unionDs = ds1.unionAll(ds2)
val results = unionDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
assertEquals(true, StreamITCase.testResults.isEmpty)
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
index effde8e..4c6e6f0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
@@ -44,8 +44,8 @@ object StreamITCase {
assertEquals(expected.asScala, StreamITCase.testResults.sorted)
}
- final class StringSink extends RichSinkFunction[Row]() {
- def invoke(value: Row) {
+ final class StringSink[T] extends RichSinkFunction[T]() {
+ def invoke(value: T) {
testResults.synchronized {
testResults += value.toString
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
index 3ac664d..8837e03 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
@@ -70,7 +70,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
.select('int.count, 'w.start, 'w.end)
val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -105,7 +105,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
.select('string, 'int.count, 'w.start, 'w.end)
val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -142,7 +142,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
.select('string, 'int.count, 'w.start, 'w.end)
val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -176,7 +176,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
.select('string, 'int.count, 'w.start, 'w.end)
val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -205,7 +205,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
.select('string, 'int.count, 'w.start, 'w.end)
val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -233,7 +233,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
.select('string, 'int.count, 'w.start, 'w.end)
val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
"Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
index 12d7202..dcd7800 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
@@ -49,7 +49,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase {
.select('c)
val results = result.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList("Hello")
@@ -72,7 +72,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase {
.select('c)
val results = result.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList("Hello", "Hello world")
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
index 9efe6a1..53caee3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
@@ -55,7 +55,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
.select('c, 'name, 'age)
.toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList("Jack#22,Jack,22", "Anna#44,Anna,44")
@@ -72,7 +72,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
.select('c, 'd, 'e)
.toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -92,7 +92,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
.select('c, 'd, 'e)
.toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList("Jack#22,Jack,22", "John#19,John,19")
@@ -112,7 +112,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
.select('a, 's)
val results = result.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList("3,Hello", "3,world")
@@ -136,7 +136,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
.select('a, 's)
val results = result.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -166,7 +166,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
.select('c, 'd, 'f, 'h, 'e, 'g, 'i)
.toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -189,7 +189,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
val result = t.select(func0('c), func1('c),func2('c))
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
@@ -211,7 +211,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
.select('c)
.join(varArgsFunc0("1", "2", 'c))
- result.addSink(new StreamITCase.StringSink)
+ result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
index c25dfdf..73cb701 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.java.utils.Pojos.Pojo1
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.stream.utils.StreamITCase
import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException}
@@ -104,7 +105,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
val t = table.select('rowtime.cast(Types.STRING))
val results = t.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -135,7 +136,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
.select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY))
val results = t.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -162,7 +163,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
val t = table.join(func('rowtime, 'proctime, 'string) as 's).select('rowtime, 's)
val results = t.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -197,7 +198,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
.select('w.rowtime, 's.count)
val results = t.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -223,7 +224,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
val t = table.unionAll(table).select('rowtime)
val results = t.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -261,7 +262,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
"GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)")
val results = t.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -294,7 +295,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
.select('w2.rowtime, 'w2.end, 'int.count)
val results = t.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = Seq(
@@ -306,6 +307,36 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testCalcMaterializationWithPojoType(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+ tEnv.registerTable("T1", table)
+ val querySql = "select rowtime as ts, string as msg from T1"
+
+ val results = tEnv.sql(querySql).toAppendStream[Pojo1]
+ results.addSink(new StreamITCase.StringSink[Pojo1])
+ env.execute()
+
+ val expected = Seq(
+ "Pojo1{ts=1970-01-01 00:00:00.001, msg='Hi'}",
+ "Pojo1{ts=1970-01-01 00:00:00.002, msg='Hallo'}",
+ "Pojo1{ts=1970-01-01 00:00:00.003, msg='Hello'}",
+ "Pojo1{ts=1970-01-01 00:00:00.004, msg='Hello'}",
+ "Pojo1{ts=1970-01-01 00:00:00.007, msg='Hello'}",
+ "Pojo1{ts=1970-01-01 00:00:00.008, msg='Hello world'}",
+ "Pojo1{ts=1970-01-01 00:00:00.016, msg='Hello world'}")
+
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
}
object TimeAttributesITCase {
[2/3] flink git commit: [FLINK-6602] [table] Prevent TableSources
with empty time attribute names.
Posted by fh...@apache.org.
[FLINK-6602] [table] Prevent TableSources with empty time attribute names.
This closes #4135.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/850e4d91
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/850e4d91
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/850e4d91
Branch: refs/heads/master
Commit: 850e4d913bf33d90409a078dab2fbc26bfa976ce
Parents: d78eeca
Author: zhe li <zhe li>
Authored: Fri Jun 16 23:13:09 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jun 19 22:41:29 2017 +0200
----------------------------------------------------------------------
.../plan/schema/StreamTableSourceTable.scala | 16 +++++++++++--
.../api/scala/stream/TableSourceTest.scala | 24 +++++++++++++++++++-
2 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/850e4d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
index fa15288..408381d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
@@ -39,7 +39,13 @@ class StreamTableSourceTable[T](
val fieldCnt = fieldNames.length
val rowtime = tableSource match {
- case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
+ case nullTimeSource : DefinedRowtimeAttribute
+ if nullTimeSource.getRowtimeAttribute == null =>
+ None
+ case emptyStringTimeSource: DefinedRowtimeAttribute
+ if emptyStringTimeSource.getRowtimeAttribute.trim.equals("") =>
+ throw TableException("The name of the rowtime attribute must not be empty.")
+ case timeSource: DefinedRowtimeAttribute =>
val rowtimeAttribute = timeSource.getRowtimeAttribute
Some((fieldCnt, rowtimeAttribute))
case _ =>
@@ -47,7 +53,13 @@ class StreamTableSourceTable[T](
}
val proctime = tableSource match {
- case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
+ case nullTimeSource : DefinedProctimeAttribute
+ if nullTimeSource.getProctimeAttribute == null =>
+ None
+ case emptyStringTimeSource: DefinedProctimeAttribute
+ if emptyStringTimeSource.getProctimeAttribute.trim.equals("") =>
+ throw TableException("The name of the proctime attribute must not be empty.")
+ case timeSource: DefinedProctimeAttribute =>
val proctimeAttribute = timeSource.getProctimeAttribute
Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
case _ =>
http://git-wip-us.apache.org/repos/asf/flink/blob/850e4d91/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
index 890ad32..32961a6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.{TableException, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource}
import org.apache.flink.table.utils.TableTestBase
@@ -121,6 +121,28 @@ class TableSourceTest extends TableTestBase {
)
util.verifyTable(t, expected)
}
+
+ @Test(expected = classOf[TableException])
+ def testRowtimeTableSourceWithEmptyName(): Unit = {
+ val util = streamTestUtil()
+ util.tEnv.registerTableSource("rowTimeT", new TestRowtimeSource(" "))
+
+ val t = util.tEnv.scan("rowTimeT")
+ .select('id)
+
+ util.tEnv.optimize(t.getRelNode, false)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testProctimeTableSourceWithEmptyName(): Unit = {
+ val util = streamTestUtil()
+ util.tEnv.registerTableSource("procTimeT", new TestProctimeSource(" "))
+
+ val t = util.tEnv.scan("procTimeT")
+ .select('id)
+
+ util.tEnv.optimize(t.getRelNode, false)
+ }
}
class TestRowtimeSource(timeField: String)
[3/3] flink git commit: [FLINK-6941] [table] Validate that start and
end window properties are not accessed on over windows.
Posted by fh...@apache.org.
[FLINK-6941] [table] Validate that start and end window properties are not accessed on over windows.
This closes #4137.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6cf6cb8d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6cf6cb8d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6cf6cb8d
Branch: refs/heads/master
Commit: 6cf6cb8ddbedb5c5e8dcfdbf498cebef305be488
Parents: 850e4d9
Author: sunjincheng121 <su...@gmail.com>
Authored: Mon Jun 19 08:06:15 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jun 19 22:41:29 2017 +0200
----------------------------------------------------------------------
.../src/main/scala/org/apache/flink/table/api/table.scala | 7 ++++++-
.../flink/table/api/scala/stream/table/OverWindowTest.scala | 9 +++++++++
.../scala/org/apache/flink/table/utils/TableTestBase.scala | 8 ++++++++
3 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6cf6cb8d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 093013e..dd02c0e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.operators.join.JoinType
import org.apache.flink.table.calcite.FlinkRelBuilder
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference}
+import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference, WindowProperty}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.plan.ProjectionTranslator._
import org.apache.flink.table.plan.logical.{Minus, _}
@@ -1004,6 +1004,11 @@ class OverWindowedTable(
table.logicalPlan,
table.tableEnv)
+ if(fields.exists(_.isInstanceOf[WindowProperty])){
+ throw ValidationException(
+ "Window start and end properties are not available for Over windows.")
+ }
+
val expandedOverFields = resolveOverWindows(expandedFields, overWindows, table.tableEnv)
new Table(
http://git-wip-us.apache.org/repos/asf/flink/blob/6cf6cb8d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
index 49a210c..e571bae 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
@@ -107,6 +107,15 @@ class OverWindowTest extends TableTestBase {
streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
}
+ @Test
+ def testAccessesWindowProperties(): Unit = {
+ thrown.expect(classOf[ValidationException])
+ thrown.expectMessage("Window start and end properties are not available for Over windows.")
+
+ table
+ .window(Over orderBy 'rowtime preceding 1.minutes as 'w)
+ .select('c, 'a.count over 'w, 'w.start, 'w.end)
+ }
@Test
def testScalarFunctionsOnOverWindow() = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6cf6cb8d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index a1b28d3..402a69d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -33,6 +33,8 @@ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironm
import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
import org.apache.flink.table.functions.AggregateFunction
import org.junit.Assert.assertEquals
+import org.junit.Rule
+import org.junit.rules.ExpectedException
import org.mockito.Mockito.{mock, when}
/**
@@ -40,6 +42,12 @@ import org.mockito.Mockito.{mock, when}
*/
class TableTestBase {
+ // used for accurate exception information checking.
+ val expectedException = ExpectedException.none()
+
+ @Rule
+ def thrown = expectedException
+
def batchTestUtil(): BatchTableTestUtil = {
BatchTableTestUtil()
}