You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/19 20:41:21 UTC

[1/3] flink git commit: [FLINK-6886] [table] Fix conversion of Row Table to POJO.

Repository: flink
Updated Branches:
  refs/heads/master 06e63386e -> 6cf6cb8dd


[FLINK-6886] [table] Fix conversion of Row Table to POJO.

This closes #4102.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d78eeca3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d78eeca3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d78eeca3

Branch: refs/heads/master
Commit: d78eeca37554ac75faf1aa451d0b4107ebd96fb9
Parents: 06e6338
Author: sunjincheng121 <su...@gmail.com>
Authored: Sat Jun 17 03:25:08 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jun 19 22:41:24 2017 +0200

----------------------------------------------------------------------
 .../table/api/StreamTableEnvironment.scala      | 22 ++++++++--
 .../table/api/java/stream/sql/SqlITCase.java    |  8 ++--
 .../flink/table/api/java/utils/Pojos.java       | 45 ++++++++++++++++++++
 .../api/scala/stream/TableSourceITCase.scala    |  6 +--
 .../api/scala/stream/sql/OverWindowITCase.scala | 32 +++++++-------
 .../table/api/scala/stream/sql/SqlITCase.scala  | 20 ++++-----
 .../api/scala/stream/table/CalcITCase.scala     | 24 +++++------
 .../table/GroupWindowAggregationsITCase.scala   | 10 ++---
 .../scala/stream/table/OverWindowITCase.scala   | 10 ++---
 .../api/scala/stream/table/UnionITCase.scala    |  8 ++--
 .../api/scala/stream/utils/StreamITCase.scala   |  4 +-
 .../datastream/DataStreamAggregateITCase.scala  | 12 +++---
 .../datastream/DataStreamCalcITCase.scala       |  4 +-
 .../DataStreamUserDefinedFunctionITCase.scala   | 16 +++----
 .../datastream/TimeAttributesITCase.scala       | 45 +++++++++++++++++---
 15 files changed, 179 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index bc5038d..178bd9f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -20,10 +20,11 @@ package org.apache.flink.table.api
 
 import _root_.java.lang.{Boolean => JBool}
 import _root_.java.util.concurrent.atomic.AtomicInteger
+import _root_.java.util.{List => JList}
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.plan.hep.HepMatchOrder
-import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelDataTypeFieldImpl, RelRecordType}
 import org.apache.calcite.rel.{RelNode, RelVisitor}
 import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
 import org.apache.calcite.sql.SqlKind
@@ -38,7 +39,7 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.calcite.RelTimeIndicatorConverter
+import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference}
 import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -604,7 +605,22 @@ abstract class StreamTableEnvironment(
       withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = {
     val relNode = table.getRelNode
     val dataStreamPlan = optimize(relNode, updatesAsRetraction)
-    translate(dataStreamPlan, relNode.getRowType, queryConfig, withChangeFlag)
+
+    // zip original field names with optimized field types
+    val fieldTypes = relNode.getRowType.getFieldList.asScala
+      .zip(dataStreamPlan.getRowType.getFieldList.asScala)
+      // get name of original plan and type of optimized plan
+      .map(x => (x._1.getName, x._2.getType))
+      // add field indexes
+      .zipWithIndex
+      // build new field types
+      .map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2))
+
+    // build a record type from list of field types
+    val rowType = new RelRecordType(
+      fieldTypes.toList.asJava.asInstanceOf[JList[RelDataTypeField]])
+
+    translate(dataStreamPlan, rowType, queryConfig, withChangeFlag)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
index 9270221..4f32382 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -71,7 +71,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 		Table result = tableEnv.sql(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
+		resultSet.addSink(new StreamITCase.StringSink<Row>());
 		env.execute();
 
 		List<String> expected = new ArrayList<>();
@@ -96,7 +96,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 		Table result = tableEnv.sql(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
+		resultSet.addSink(new StreamITCase.StringSink<Row>());
 		env.execute();
 
 		List<String> expected = new ArrayList<>();
@@ -120,7 +120,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 		Table result = tableEnv.sql(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
+		resultSet.addSink(new StreamITCase.StringSink<Row>());
 		env.execute();
 
 		List<String> expected = new ArrayList<>();
@@ -151,7 +151,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 		Table result = tableEnv.sql(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
+		resultSet.addSink(new StreamITCase.StringSink<Row>());
 		env.execute();
 
 		List<String> expected = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
new file mode 100644
index 0000000..3048835
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.utils;
+
+import java.io.Serializable;
+import java.sql.Timestamp;
+
+/**
+ * POJOs for table api testing.
+ */
+public class Pojos {
+
+	/**
+	 * Pojo1 for test.
+	 */
+	public static class Pojo1 implements Serializable {
+
+		public Timestamp ts;
+		public String msg;
+
+		@Override
+		public String toString() {
+			return "Pojo1{" +
+					"ts=" + ts +
+					", msg='" + msg + '\'' +
+					'}';
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index 13ec2b4..a7037bb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -47,7 +47,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
     tEnv.sql(
       "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
       .toAppendStream[Row]
-      .addSink(new StreamITCase.StringSink)
+      .addSink(new StreamITCase.StringSink[Row])
 
     env.execute()
 
@@ -72,7 +72,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
       .where('id > 4)
       .select('last, 'score * 2)
       .toAppendStream[Row]
-      .addSink(new StreamITCase.StringSink)
+      .addSink(new StreamITCase.StringSink[Row])
 
     env.execute()
 
@@ -94,7 +94,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
     tEnv.scan(tableName)
       .where("amount > 4 && price < 9")
       .select("id, name")
-      .addSink(new StreamITCase.StringSink)
+      .addSink(new StreamITCase.StringSink[Row])
 
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index 36eff1e..7a24f50 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -68,7 +68,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
   }
 
@@ -93,7 +93,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "FROM MyTable"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -135,7 +135,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
       "FROM MyTable"
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -178,7 +178,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -206,7 +206,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "as cnt1 from T1)"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -236,7 +236,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -261,7 +261,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
@@ -323,7 +323,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       " FROM T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -384,7 +384,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "FROM T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -452,7 +452,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       " FROM T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -513,7 +513,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "FROM T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -574,7 +574,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -640,7 +640,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -702,7 +702,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -763,7 +763,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -835,7 +835,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index bdc1fcc..55633ff 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -58,7 +58,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("MyTableRow", t)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List("Hello,Worlds,1","Hello again,Worlds,2")
@@ -100,7 +100,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("MyTable", t)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List("2,0", "4,1", "6,1")
@@ -121,7 +121,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("MyTable", t)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List("3,2,Hello world")
@@ -142,7 +142,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerDataStream("MyTable", t)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List("3,2,Hello world")
@@ -166,7 +166,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T2", t2)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -193,7 +193,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T2", t2)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -219,7 +219,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List("Hello", "Hello world")
@@ -244,7 +244,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -276,7 +276,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -306,7 +306,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
index b355cf0..adf4d44 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
@@ -42,7 +42,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -61,7 +61,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("1", "2", "3")
@@ -80,7 +80,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
       .select('a, 'b)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -100,7 +100,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
       .select('a, 'b, 'c)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -119,7 +119,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("no")
@@ -135,7 +135,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("no")
@@ -152,7 +152,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("no")
@@ -172,7 +172,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
 
     val filterDs = ds.filter('a === 3)
     val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("3,2,Hello world")
@@ -192,7 +192,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
 
     val filterDs = ds.filter( Literal(false) )
     val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     assertEquals(true, StreamITCase.testResults.isEmpty)
@@ -211,7 +211,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
 
     val filterDs = ds.filter( Literal(true) )
     val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -234,7 +234,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
 
     val filterDs = ds.filter( 'a % 2 === 0 )
     val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -258,7 +258,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
 
     val filterDs = ds.filter( 'a % 2 !== 0)
     val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
     val expected = mutable.MutableList(
       "1,1,Hi", "3,2,Hello world",

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
index 87a35bf..d78aea6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -71,7 +71,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
     val results = windowedTable.toAppendStream[Row](queryConfig)
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq("Hello world,1,3,8,3", "Hello world,2,3,12,3", "Hello,1,2,2,2",
@@ -113,7 +113,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq("Hello World,1,9,9,9", "Hello,1,16,16,16", "Hello,4,3,5,5")
@@ -139,7 +139,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
     val results = windowedTable.toAppendStream[Row](queryConfig)
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq("2,1,1,1", "2,2,6,2")
@@ -167,7 +167,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
               weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -203,7 +203,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
       .select(weightAvgFun('long, 'int))
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq("12", "8", "2", "3", "1")

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
index 133328e..6f9aad2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
@@ -70,7 +70,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .select('c, 'mycount, 'wAvg)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -131,7 +131,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
         weightAvgFun('b, 'a) over 'w)
 
     val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -186,7 +186,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w)
       .select('a, 'c.sum over 'w, 'c.min over 'w)
     val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -249,7 +249,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
 
     val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -312,7 +312,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .select('c, 'b, 'a.count over 'w, 'a.sum over 'w)
 
     val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
index e30540d..3d1704e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
@@ -44,7 +44,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
     val unionDs = ds1.unionAll(ds2).select('c)
 
     val results = unionDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -64,7 +64,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
     val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
 
     val results = unionDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("Hi", "Hallo")
@@ -83,7 +83,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
     val unionDs = ds1.unionAll(ds2)
 
     val results = unionDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     assertEquals(true, StreamITCase.testResults.isEmpty)
@@ -102,7 +102,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
     val unionDs = ds1.unionAll(ds2)
 
     val results = unionDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     assertEquals(true, StreamITCase.testResults.isEmpty)

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
index effde8e..4c6e6f0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
@@ -44,8 +44,8 @@ object StreamITCase {
     assertEquals(expected.asScala, StreamITCase.testResults.sorted)
   }
 
-  final class StringSink extends RichSinkFunction[Row]() {
-    def invoke(value: Row) {
+  final class StringSink[T] extends RichSinkFunction[T]() {
+    def invoke(value: T) {
       testResults.synchronized {
         testResults += value.toString
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
index 3ac664d..8837e03 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
@@ -70,7 +70,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
       .select('int.count, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -105,7 +105,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -142,7 +142,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -176,7 +176,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -205,7 +205,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -233,7 +233,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
     val expected = Seq(
       "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
index 12d7202..dcd7800 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
@@ -49,7 +49,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase {
       .select('c)
 
     val results = result.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("Hello")
@@ -72,7 +72,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase {
       .select('c)
 
     val results = result.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("Hello", "Hello world")

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
index 9efe6a1..53caee3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
@@ -55,7 +55,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('c, 'name, 'age)
       .toAppendStream[Row]
 
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("Jack#22,Jack,22", "Anna#44,Anna,44")
@@ -72,7 +72,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('c, 'd, 'e)
       .toAppendStream[Row]
 
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -92,7 +92,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('c, 'd, 'e)
       .toAppendStream[Row]
 
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("Jack#22,Jack,22", "John#19,John,19")
@@ -112,7 +112,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('a, 's)
 
     val results = result.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("3,Hello", "3,world")
@@ -136,7 +136,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('a, 's)
 
     val results = result.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -166,7 +166,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('c, 'd, 'f, 'h, 'e, 'g, 'i)
       .toAppendStream[Row]
 
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -189,7 +189,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
 
     val result = t.select(func0('c), func1('c),func2('c))
 
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -211,7 +211,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('c)
       .join(varArgsFunc0("1", "2", 'c))
 
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(

http://git-wip-us.apache.org/repos/asf/flink/blob/d78eeca3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
index c25dfdf..73cb701 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.java.utils.Pojos.Pojo1
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.StreamITCase
 import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException}
@@ -104,7 +105,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     val t = table.select('rowtime.cast(Types.STRING))
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -135,7 +136,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
       .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY))
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -162,7 +163,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     val t = table.join(func('rowtime, 'proctime, 'string) as 's).select('rowtime, 's)
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -197,7 +198,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
       .select('w.rowtime, 's.count)
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -223,7 +224,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     val t = table.unionAll(table).select('rowtime)
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -261,7 +262,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
       "GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)")
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -294,7 +295,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
       .select('w2.rowtime, 'w2.end, 'int.count)
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -306,6 +307,36 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testCalcMaterializationWithPojoType(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+    tEnv.registerTable("T1", table)
+    val querySql = "select rowtime as ts, string as msg from T1"
+
+    val results = tEnv.sql(querySql).toAppendStream[Pojo1]
+    results.addSink(new StreamITCase.StringSink[Pojo1])
+    env.execute()
+
+    val expected = Seq(
+      "Pojo1{ts=1970-01-01 00:00:00.001, msg='Hi'}",
+      "Pojo1{ts=1970-01-01 00:00:00.002, msg='Hallo'}",
+      "Pojo1{ts=1970-01-01 00:00:00.003, msg='Hello'}",
+      "Pojo1{ts=1970-01-01 00:00:00.004, msg='Hello'}",
+      "Pojo1{ts=1970-01-01 00:00:00.007, msg='Hello'}",
+      "Pojo1{ts=1970-01-01 00:00:00.008, msg='Hello world'}",
+      "Pojo1{ts=1970-01-01 00:00:00.016, msg='Hello world'}")
+
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
 }
 
 object TimeAttributesITCase {


[2/3] flink git commit: [FLINK-6602] [table] Prevent TableSources with empty time attribute names.

Posted by fh...@apache.org.
[FLINK-6602] [table] Prevent TableSources with empty time attribute names.

This closes #4135.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/850e4d91
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/850e4d91
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/850e4d91

Branch: refs/heads/master
Commit: 850e4d913bf33d90409a078dab2fbc26bfa976ce
Parents: d78eeca
Author: zhe li <zhe li>
Authored: Fri Jun 16 23:13:09 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jun 19 22:41:29 2017 +0200

----------------------------------------------------------------------
 .../plan/schema/StreamTableSourceTable.scala    | 16 +++++++++++--
 .../api/scala/stream/TableSourceTest.scala      | 24 +++++++++++++++++++-
 2 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/850e4d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
index fa15288..408381d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
@@ -39,7 +39,13 @@ class StreamTableSourceTable[T](
     val fieldCnt = fieldNames.length
 
     val rowtime = tableSource match {
-      case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
+      case nullTimeSource : DefinedRowtimeAttribute
+        if nullTimeSource.getRowtimeAttribute == null =>
+          None
+      case emptyStringTimeSource: DefinedRowtimeAttribute
+        if emptyStringTimeSource.getRowtimeAttribute.trim.equals("")  =>
+          throw TableException("The name of the rowtime attribute must not be empty.")
+      case timeSource: DefinedRowtimeAttribute  =>
         val rowtimeAttribute = timeSource.getRowtimeAttribute
         Some((fieldCnt, rowtimeAttribute))
       case _ =>
@@ -47,7 +53,13 @@ class StreamTableSourceTable[T](
     }
 
     val proctime = tableSource match {
-      case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
+      case nullTimeSource : DefinedProctimeAttribute
+        if nullTimeSource.getProctimeAttribute == null =>
+          None
+      case emptyStringTimeSource: DefinedProctimeAttribute
+        if emptyStringTimeSource.getProctimeAttribute.trim.equals("")  =>
+          throw TableException("The name of the proctime attribute must not be empty.")
+      case timeSource: DefinedProctimeAttribute  =>
         val proctimeAttribute = timeSource.getProctimeAttribute
         Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
       case _ =>

http://git-wip-us.apache.org/repos/asf/flink/blob/850e4d91/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
index 890ad32..32961a6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.{TableException, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource}
 import org.apache.flink.table.utils.TableTestBase
@@ -121,6 +121,28 @@ class TableSourceTest extends TableTestBase {
       )
     util.verifyTable(t, expected)
   }
+
+  @Test(expected = classOf[TableException])
+  def testRowtimeTableSourceWithEmptyName(): Unit = {
+    val util = streamTestUtil()
+    util.tEnv.registerTableSource("rowTimeT", new TestRowtimeSource(" "))
+
+    val t = util.tEnv.scan("rowTimeT")
+      .select('id)
+
+    util.tEnv.optimize(t.getRelNode, false)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testProctimeTableSourceWithEmptyName(): Unit = {
+    val util = streamTestUtil()
+    util.tEnv.registerTableSource("procTimeT", new TestProctimeSource(" "))
+
+    val t = util.tEnv.scan("procTimeT")
+      .select('id)
+
+    util.tEnv.optimize(t.getRelNode, false)
+  }
 }
 
 class TestRowtimeSource(timeField: String)


[3/3] flink git commit: [FLINK-6941] [table] Validate that start and end window properties are not accessed on over windows.

Posted by fh...@apache.org.
[FLINK-6941] [table] Validate that start and end window properties are not accessed on over windows.

This closes #4137.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6cf6cb8d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6cf6cb8d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6cf6cb8d

Branch: refs/heads/master
Commit: 6cf6cb8ddbedb5c5e8dcfdbf498cebef305be488
Parents: 850e4d9
Author: sunjincheng121 <su...@gmail.com>
Authored: Mon Jun 19 08:06:15 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jun 19 22:41:29 2017 +0200

----------------------------------------------------------------------
 .../src/main/scala/org/apache/flink/table/api/table.scala   | 7 ++++++-
 .../flink/table/api/scala/stream/table/OverWindowTest.scala | 9 +++++++++
 .../scala/org/apache/flink/table/utils/TableTestBase.scala  | 8 ++++++++
 3 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6cf6cb8d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 093013e..dd02c0e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.table.calcite.FlinkRelBuilder
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference}
+import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference, WindowProperty}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.plan.ProjectionTranslator._
 import org.apache.flink.table.plan.logical.{Minus, _}
@@ -1004,6 +1004,11 @@ class OverWindowedTable(
       table.logicalPlan,
       table.tableEnv)
 
+    if(fields.exists(_.isInstanceOf[WindowProperty])){
+      throw ValidationException(
+        "Window start and end properties are not available for Over windows.")
+    }
+
     val expandedOverFields = resolveOverWindows(expandedFields, overWindows, table.tableEnv)
 
     new Table(

http://git-wip-us.apache.org/repos/asf/flink/blob/6cf6cb8d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
index 49a210c..e571bae 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
@@ -107,6 +107,15 @@ class OverWindowTest extends TableTestBase {
     streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
   }
 
+  @Test
+  def testAccessesWindowProperties(): Unit = {
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage("Window start and end properties are not available for Over windows.")
+
+    table
+    .window(Over orderBy 'rowtime preceding 1.minutes as 'w)
+    .select('c, 'a.count over 'w, 'w.start, 'w.end)
+  }
 
   @Test
   def testScalarFunctionsOnOverWindow() = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6cf6cb8d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index a1b28d3..402a69d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -33,6 +33,8 @@ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironm
 import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
 import org.apache.flink.table.functions.AggregateFunction
 import org.junit.Assert.assertEquals
+import org.junit.Rule
+import org.junit.rules.ExpectedException
 import org.mockito.Mockito.{mock, when}
 
 /**
@@ -40,6 +42,12 @@ import org.mockito.Mockito.{mock, when}
   */
 class TableTestBase {
 
+  // used for accurate exception information checking.
+  val expectedException = ExpectedException.none()
+
+  @Rule
+  def thrown = expectedException
+
   def batchTestUtil(): BatchTableTestUtil = {
     BatchTableTestUtil()
   }