You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/16 10:04:33 UTC

flink git commit: [FLINK-7698] [table] Tests joins with null literals

Repository: flink
Updated Branches:
  refs/heads/master d0b2aa28d -> 101fef739


[FLINK-7698] [table] Tests joins with null literals


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/101fef73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/101fef73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/101fef73

Branch: refs/heads/master
Commit: 101fef7397128b0aba23221481ab86f62b18118f
Parents: d0b2aa2
Author: twalthr <tw...@apache.org>
Authored: Thu Nov 16 11:00:30 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Thu Nov 16 11:00:30 2017 +0100

----------------------------------------------------------------------
 .../flink/table/api/stream/sql/JoinTest.scala   | 45 ++++++++++++++++++++
 1 file changed, 45 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/101fef73/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 8c1865c..c14b698 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -19,7 +19,9 @@ package org.apache.flink.table.api.stream.sql
 
 import org.apache.calcite.rel.logical.LogicalJoin
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Null
 import org.apache.flink.table.plan.logical.TumblingGroupWindow
 import org.apache.flink.table.runtime.join.WindowJoinUtil
 import org.apache.flink.table.utils.TableTestUtil.{term, _}
@@ -245,6 +247,49 @@ class JoinTest extends TableTestBase {
   }
 
   @Test
+  def testJoinWithNullLiteral(): Unit = {
+    val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+    val t1 = streamUtil.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c, 'proctime.proctime)
+      .select('a, 'b, 'c, 'proctime, Null(Types.LONG) as 'nullField)
+
+    val t2 = streamUtil.addTable[(Int, Long, String)]("Table2", 'a, 'b, 'c, 'proctime.proctime)
+      .select('a, 'b, 'c, 'proctime, 12L as 'nullField)
+
+    streamUtil.tableEnv.registerTable("T1", t1)
+    streamUtil.tableEnv.registerTable("T2", t2)
+
+    val sqlQuery =
+      """
+        |SELECT t2.a, t2.c, t1.c
+        |FROM T1 AS t1
+        |JOIN T2 AS t2 ON t1.a = t2.a AND t1.nullField = t2.nullField AND
+        |  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND
+        |  t2.proctime + INTERVAL '5' SECOND
+        |""".stripMargin
+
+    val expected =
+      unaryNode("DataStreamCalc",
+        binaryNode("DataStreamWindowJoin",
+          unaryNode("DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime", "null AS nullField")
+          ),
+          unaryNode("DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "c", "proctime", "12 AS nullField")
+          ),
+          term("where", "AND(=(a, a0), =(nullField, nullField0), >=(proctime, " +
+            "-(proctime0, 5000)), <=(proctime, DATETIME_PLUS(proctime0, 5000)))"),
+          term("join", "a", "c", "proctime", "nullField", "a0", "c0", "proctime0", "nullField0"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a0 AS a", "c0 AS c", "c AS c0")
+      )
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
   def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = {
 
     val sqlQuery =