You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/16 10:04:33 UTC
flink git commit: [FLINK-7698] [table] Tests joins with null literals
Repository: flink
Updated Branches:
refs/heads/master d0b2aa28d -> 101fef739
[FLINK-7698] [table] Tests joins with null literals
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/101fef73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/101fef73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/101fef73
Branch: refs/heads/master
Commit: 101fef7397128b0aba23221481ab86f62b18118f
Parents: d0b2aa2
Author: twalthr <tw...@apache.org>
Authored: Thu Nov 16 11:00:30 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Thu Nov 16 11:00:30 2017 +0100
----------------------------------------------------------------------
.../flink/table/api/stream/sql/JoinTest.scala | 45 ++++++++++++++++++++
1 file changed, 45 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/101fef73/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 8c1865c..c14b698 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -19,7 +19,9 @@ package org.apache.flink.table.api.stream.sql
import org.apache.calcite.rel.logical.LogicalJoin
import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Null
import org.apache.flink.table.plan.logical.TumblingGroupWindow
import org.apache.flink.table.runtime.join.WindowJoinUtil
import org.apache.flink.table.utils.TableTestUtil.{term, _}
@@ -245,6 +247,49 @@ class JoinTest extends TableTestBase {
}
@Test
+ def testJoinWithNullLiteral(): Unit = {
+ val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+ val t1 = streamUtil.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c, 'proctime.proctime)
+ .select('a, 'b, 'c, 'proctime, Null(Types.LONG) as 'nullField)
+
+ val t2 = streamUtil.addTable[(Int, Long, String)]("Table2", 'a, 'b, 'c, 'proctime.proctime)
+ .select('a, 'b, 'c, 'proctime, 12L as 'nullField)
+
+ streamUtil.tableEnv.registerTable("T1", t1)
+ streamUtil.tableEnv.registerTable("T2", t2)
+
+ val sqlQuery =
+ """
+ |SELECT t2.a, t2.c, t1.c
+ |FROM T1 AS t1
+ |JOIN T2 AS t2 ON t1.a = t2.a AND t1.nullField = t2.nullField AND
+ | t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND
+ | t2.proctime + INTERVAL '5' SECOND
+ |""".stripMargin
+
+ val expected =
+ unaryNode("DataStreamCalc",
+ binaryNode("DataStreamWindowJoin",
+ unaryNode("DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "proctime", "null AS nullField")
+ ),
+ unaryNode("DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "c", "proctime", "12 AS nullField")
+ ),
+ term("where", "AND(=(a, a0), =(nullField, nullField0), >=(proctime, " +
+ "-(proctime0, 5000)), <=(proctime, DATETIME_PLUS(proctime0, 5000)))"),
+ term("join", "a", "c", "proctime", "nullField", "a0", "c0", "proctime0", "nullField0"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a0 AS a", "c0 AS c", "c AS c0")
+ )
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = {
val sqlQuery =