You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/07/18 07:40:07 UTC

[2/3] flink git commit: [FLINK-6232] [table] Add SQL documentation for time window join.

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
deleted file mode 100644
index 15e8b89..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.calcite.rel.logical.LogicalJoin
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.table.runtime.join.WindowJoinUtil
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.Assert._
-import org.junit.Test
-
-class JoinTest extends TableTestBase {
-  private val streamUtil: StreamTableTestUtil = streamTestUtil()
-  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
-  streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
-
-  @Test
-  def testProcessingTimeInnerJoin() = {
-
-    val sqlQuery = "SELECT t1.a, t2.b " +
-      "FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " +
-      "t1.proctime between t2.proctime - interval '1' hour and t2.proctime + interval '1' hour"
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        binaryNode(
-          "DataStreamWindowJoin",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "proctime")
-          ),
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(1),
-            term("select", "a", "b", "proctime")
-          ),
-          term("where",
-            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
-              "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
-          term("join", "a, proctime, a0, b, proctime0"),
-          term("joinType", "InnerJoin")
-        ),
-        term("select", "a", "b")
-      )
-
-    streamUtil.verifySql(sqlQuery, expected)
-  }
-
-  /** There should exist time conditions **/
-  @Test(expected = classOf[TableException])
-  def testWindowJoinUnExistTimeCondition() = {
-    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a"
-    streamUtil.verifySql(sql, "n/a")
-  }
-
-  /** There should exist exactly two time conditions **/
-  @Test(expected = classOf[TableException])
-  def testWindowJoinSingleTimeCondition() = {
-    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
-      " and t1.proctime > t2.proctime - interval '5' second"
-    streamUtil.verifySql(sql, "n/a")
-  }
-
-  /** Both time attributes in a join condition must be of the same type **/
-  @Test(expected = classOf[TableException])
-  def testWindowJoinDiffTimeIndicator() = {
-    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
-      " and t1.proctime > t2.proctime - interval '5' second " +
-      " and t1.proctime < t2.c + interval '5' second"
-    streamUtil.verifySql(sql, "n/a")
-  }
-
-  /** The time conditions should be an And condition **/
-  @Test(expected = classOf[TableException])
-  def testWindowJoinNotCnfCondition() = {
-    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
-      " and (t1.proctime > t2.proctime - interval '5' second " +
-      " or t1.proctime < t2.c + interval '5' second)"
-    streamUtil.verifySql(sql, "n/a")
-  }
-
-  @Test
-  def testJoinTimeBoundary(): Unit = {
-    verifyTimeBoundary(
-      "t1.proctime between t2.proctime - interval '1' hour " +
-        "and t2.proctime + interval '1' hour",
-      -3600000,
-      3600000,
-      "proctime")
-
-    verifyTimeBoundary(
-      "t1.proctime > t2.proctime - interval '1' second and " +
-        "t1.proctime < t2.proctime + interval '1' second",
-      -999,
-      999,
-      "proctime")
-
-    verifyTimeBoundary(
-      "t1.c >= t2.c - interval '1' second and " +
-        "t1.c <= t2.c + interval '1' second",
-      -1000,
-      1000,
-      "rowtime")
-
-    verifyTimeBoundary(
-      "t1.c >= t2.c and " +
-        "t1.c <= t2.c + interval '1' second",
-      0,
-      1000,
-      "rowtime")
-
-    verifyTimeBoundary(
-      "t1.c >= t2.c + interval '1' second and " +
-        "t1.c <= t2.c + interval '10' second",
-      1000,
-      10000,
-      "rowtime")
-
-    verifyTimeBoundary(
-      "t2.c - interval '1' second <= t1.c and " +
-        "t2.c + interval '10' second >= t1.c",
-      -1000,
-      10000,
-      "rowtime")
-
-    verifyTimeBoundary(
-      "t1.c - interval '2' second >= t2.c + interval '1' second -" +
-        "interval '10' second and " +
-        "t1.c <= t2.c + interval '10' second",
-      -7000,
-      10000,
-      "rowtime")
-
-    verifyTimeBoundary(
-      "t1.c >= t2.c - interval '10' second and " +
-        "t1.c <= t2.c - interval '5' second",
-      -10000,
-      -5000,
-      "rowtime")
-  }
-
-  @Test
-  def testJoinRemainConditionConvert(): Unit = {
-    streamUtil.addTable[(Int, Long, Int)]("MyTable3", 'a, 'b.rowtime, 'c, 'proctime.proctime)
-    streamUtil.addTable[(Int, Long, Int)]("MyTable4", 'a, 'b.rowtime, 'c, 'proctime.proctime)
-    val query =
-      "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
-        "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second and " +
-        "t1.c > t2.c"
-    verifyRemainConditionConvert(
-      query,
-      ">($1, $3)")
-
-    val query1 =
-      "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
-        "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second "
-    verifyRemainConditionConvert(
-      query1,
-      "")
-
-    streamUtil.addTable[(Int, Long, Int)]("MyTable5", 'a, 'b, 'c, 'proctime.proctime)
-    streamUtil.addTable[(Int, Long, Int)]("MyTable6", 'a, 'b, 'c, 'proctime.proctime)
-    val query2 =
-      "SELECT t1.a, t2.c FROM MyTable5 as t1 join MyTable6 as t2 on t1.a = t2.a and " +
-        "t1.proctime >= t2.proctime - interval '10' second " +
-        "and t1.proctime <= t2.proctime - interval '5' second and " +
-        "t1.c > t2.c"
-    verifyRemainConditionConvert(
-      query2,
-      ">($2, $5)")
-  }
-
-  def verifyTimeBoundary(
-      timeSql: String,
-      expLeftSize: Long,
-      expRightSize: Long,
-      expTimeType: String) = {
-    val query =
-      "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
-
-    val resultTable = streamUtil.tEnv.sql(query)
-    val relNode = resultTable.getRelNode
-    val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
-    val rexNode = joinNode.getCondition
-    val (isRowTime, lowerBound, upperBound, conditionWithoutTime) =
-      WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType),
-        joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
-
-    val timeTypeStr =
-      if (isRowTime) "rowtime"
-      else  "proctime"
-    assertEquals(expLeftSize, lowerBound)
-    assertEquals(expRightSize, upperBound)
-    assertEquals(expTimeType, timeTypeStr)
-  }
-
-  def verifyRemainConditionConvert(
-      query: String,
-      expectCondStr: String) = {
-
-    val resultTable = streamUtil.tEnv.sql(query)
-    val relNode = resultTable.getRelNode
-    val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
-    val joinInfo = joinNode.analyzeCondition
-    val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
-    val (isRowTime, lowerBound, upperBound, remainCondition) =
-      WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType),
-        joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
-
-    val actual: String = remainCondition.getOrElse("").toString
-
-    assertEquals(expectCondStr, actual)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
new file mode 100644
index 0000000..640fd26
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.join.WindowJoinUtil
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Assert._
+import org.junit.Test
+
+class JoinTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+  streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+
+  @Test
+  def testProcessingTimeInnerJoinWithOnClause() = {
+
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1 JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+        |""".stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "proctime")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "proctime")
+          ),
+          term("where",
+            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+              "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+          term("join", "a, proctime, a0, b, proctime0"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testProcessingTimeInnerJoinWithWhereClause() = {
+
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+        |""".stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "proctime")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "proctime")
+          ),
+          term("where",
+            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+              "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+          term("join", "a, proctime, a0, b, proctime0"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b0 AS b")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testJoinTimeBoundary(): Unit = {
+    verifyTimeBoundary(
+      "t1.proctime between t2.proctime - interval '1' hour " +
+        "and t2.proctime + interval '1' hour",
+      -3600000,
+      3600000,
+      "proctime")
+
+    verifyTimeBoundary(
+      "t1.proctime > t2.proctime - interval '1' second and " +
+        "t1.proctime < t2.proctime + interval '1' second",
+      -999,
+      999,
+      "proctime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c - interval '1' second and " +
+        "t1.c <= t2.c + interval '1' second",
+      -1000,
+      1000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c and " +
+        "t1.c <= t2.c + interval '1' second",
+      0,
+      1000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c + interval '1' second and " +
+        "t1.c <= t2.c + interval '10' second",
+      1000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t2.c - interval '1' second <= t1.c and " +
+        "t2.c + interval '10' second >= t1.c",
+      -1000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c - interval '2' second >= t2.c + interval '1' second -" +
+        "interval '10' second and " +
+        "t1.c <= t2.c + interval '10' second",
+      -7000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c - interval '10' second and " +
+        "t1.c <= t2.c - interval '5' second",
+      -10000,
+      -5000,
+      "rowtime")
+  }
+
+  @Test
+  def testJoinRemainConditionConvert(): Unit = {
+    streamUtil.addTable[(Int, Long, Int)]("MyTable3", 'a, 'b.rowtime, 'c, 'proctime.proctime)
+    streamUtil.addTable[(Int, Long, Int)]("MyTable4", 'a, 'b.rowtime, 'c, 'proctime.proctime)
+    val query =
+      "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
+        "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second and " +
+        "t1.c > t2.c"
+    verifyRemainConditionConvert(
+      query,
+      ">($2, $6)")
+
+    val query1 =
+      "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
+        "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second "
+    verifyRemainConditionConvert(
+      query1,
+      "")
+
+    streamUtil.addTable[(Int, Long, Int)]("MyTable5", 'a, 'b, 'c, 'proctime.proctime)
+    streamUtil.addTable[(Int, Long, Int)]("MyTable6", 'a, 'b, 'c, 'proctime.proctime)
+    val query2 =
+      "SELECT t1.a, t2.c FROM MyTable5 as t1 join MyTable6 as t2 on t1.a = t2.a and " +
+        "t1.proctime >= t2.proctime - interval '10' second " +
+        "and t1.proctime <= t2.proctime - interval '5' second and " +
+        "t1.c > t2.c"
+    verifyRemainConditionConvert(
+      query2,
+      ">($2, $6)")
+  }
+
+  private def verifyTimeBoundary(
+      timeSql: String,
+      expLeftSize: Long,
+      expRightSize: Long,
+      expTimeType: String): Unit = {
+    val query =
+      "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
+
+    val resultTable = streamUtil.tableEnv.sql(query)
+    val relNode = resultTable.getRelNode
+    val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+    val rexNode = joinNode.getCondition
+    val (windowBounds, _) =
+      WindowJoinUtil.extractWindowBoundsFromPredicate(
+        rexNode,
+        4,
+        joinNode.getRowType,
+        joinNode.getCluster.getRexBuilder,
+        streamUtil.tableEnv.getConfig)
+
+    val timeTypeStr =
+      if (windowBounds.get.isEventTime) "rowtime"
+      else  "proctime"
+    assertEquals(expLeftSize, windowBounds.get.leftLowerBound)
+    assertEquals(expRightSize, windowBounds.get.leftUpperBound)
+    assertEquals(expTimeType, timeTypeStr)
+  }
+
+  private def verifyRemainConditionConvert(
+      query: String,
+      expectCondStr: String): Unit = {
+
+    val resultTable = streamUtil.tableEnv.sql(query)
+    val relNode = resultTable.getRelNode
+    val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+    val joinInfo = joinNode.analyzeCondition
+    val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
+    val (_, remainCondition) =
+      WindowJoinUtil.extractWindowBoundsFromPredicate(
+        rexNode,
+        4,
+        joinNode.getRowType,
+        joinNode.getCluster.getRexBuilder,
+        streamUtil.tableEnv.getConfig)
+
+    val actual: String = remainCondition.getOrElse("").toString
+
+    assertEquals(expectCondStr, actual)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
new file mode 100644
index 0000000..9cce37e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class JoinValidationTest extends TableTestBase {
+
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+  streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+
+  /** There should exist time conditions **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinUnExistTimeCondition() = {
+    val sql =
+      """
+        |SELECT t2.a
+        |FROM MyTable t1 JOIN MyTable2 t2 ON t1.a = t2.a""".stripMargin
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** There should exist exactly two time conditions **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinSingleTimeCondition() = {
+    val sql =
+      """
+        |SELECT t2.a
+        |FROM MyTable t1 JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  t1.proctime > t2.proctime - INTERVAL '5' SECOND""".stripMargin
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** Both time attributes in a join condition must be of the same type **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinDiffTimeIndicator() = {
+    val sql =
+      """
+        |SELECT t2.a FROM
+        |MyTable t1 JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  t1.proctime > t2.proctime - INTERVAL '5' SECOND AND
+        |  t1.proctime < t2.c + INTERVAL '5' SECOND""".stripMargin
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** The time conditions should be an And condition **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinNotCnfCondition() = {
+    val sql =
+      """
+        |SELECT t2.a
+        |FROM MyTable t1 JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  (t1.proctime > t2.proctime - INTERVAL '5' SECOND OR
+        |   t1.proctime < t2.c + INTERVAL '5' SECOND)""".stripMargin
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** Validates that no rowtime attribute is in the output schema **/
+  @Test(expected = classOf[TableException])
+  def testNoRowtimeAttributeInResult(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND t2.proctime
+        | """.stripMargin
+
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 7885160..90c8ea4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -85,7 +85,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
       "DataStreamCalc",
       streamTableNode(0),
       term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"),
-      term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)")
+      term("where", ">(rowtime, 1990-12-02 12:11:11)")
     )
 
     util.verifyTable(result, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index c008ed3..6c24c5d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -20,20 +20,18 @@ package org.apache.flink.table.runtime.harness
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.lang.{Integer => JInt}
 
-import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
-import org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, TwoInputStreamOperatorTestHarness}
-import org.apache.flink.table.codegen.GeneratedFunction
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
 import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, TupleRowKeySelector}
 import org.apache.flink.table.runtime.join.ProcTimeWindowInnerJoin
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 import org.junit.Test
-
+import org.junit.Assert.{assertEquals, assertTrue}
 
 class JoinHarnessTest extends HarnessTestBase{
 
@@ -89,7 +87,7 @@ class JoinHarnessTest extends HarnessTestBase{
        new TupleRowKeySelector[Integer](0),
        new TupleRowKeySelector[Integer](0),
        BasicTypeInfo.INT_TYPE_INFO,
-       1,1,0)
+       1, 1, 0)
 
     testHarness.open()
 
@@ -97,16 +95,16 @@ class JoinHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1)
     testHarness.processElement1(new StreamRecord(
       CRow(Row.of(1: JInt, "aaa"), true), 1))
-    assert(testHarness.numProcessingTimeTimers() == 1)
+    assertEquals(1, testHarness.numProcessingTimeTimers())
     testHarness.setProcessingTime(2)
     testHarness.processElement1(new StreamRecord(
       CRow(Row.of(2: JInt, "bbb"), true), 2))
-    assert(testHarness.numProcessingTimeTimers() == 2)
+    assertEquals(2, testHarness.numProcessingTimeTimers())
     testHarness.setProcessingTime(3)
     testHarness.processElement1(new StreamRecord(
       CRow(Row.of(1: JInt, "aaa2"), true), 3))
-    assert(testHarness.numKeyedStateEntries() == 4)
-    assert(testHarness.numProcessingTimeTimers() == 2)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+    assertEquals(2, testHarness.numProcessingTimeTimers())
 
     // right stream input and output normally
     testHarness.processElement2(new StreamRecord(
@@ -114,20 +112,20 @@ class JoinHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(4)
     testHarness.processElement2(new StreamRecord(
       CRow(Row.of(2: JInt, "Hello1"), true), 4))
-    assert(testHarness.numKeyedStateEntries() == 8)
-    assert(testHarness.numProcessingTimeTimers() == 4)
+    assertEquals(8, testHarness.numKeyedStateEntries())
+    assertEquals(4, testHarness.numProcessingTimeTimers())
 
     // expired left stream record at timestamp 1
     testHarness.setProcessingTime(12)
-    assert(testHarness.numKeyedStateEntries() == 8)
-    assert(testHarness.numProcessingTimeTimers() == 4)
+    assertEquals(8, testHarness.numKeyedStateEntries())
+    assertEquals(4, testHarness.numProcessingTimeTimers())
     testHarness.processElement2(new StreamRecord(
       CRow(Row.of(1: JInt, "Hi2"), true), 12))
 
     // expired right stream record at timestamp 4 and all left stream
     testHarness.setProcessingTime(25)
-    assert(testHarness.numKeyedStateEntries() == 2)
-    assert(testHarness.numProcessingTimeTimers() == 1)
+    assertEquals(2, testHarness.numKeyedStateEntries())
+    assertEquals(1, testHarness.numProcessingTimeTimers())
     testHarness.processElement1(new StreamRecord(
       CRow(Row.of(1: JInt, "aaa3"), true), 25))
     testHarness.processElement1(new StreamRecord(
@@ -136,9 +134,9 @@ class JoinHarnessTest extends HarnessTestBase{
       CRow(Row.of(2: JInt, "Hello2"), true), 25))
 
     testHarness.setProcessingTime(45)
-    assert(testHarness.numKeyedStateEntries() > 0)
+    assertTrue(testHarness.numKeyedStateEntries() > 0)
     testHarness.setProcessingTime(46)
-    assert(testHarness.numKeyedStateEntries() == 0)
+    assertEquals(0, testHarness.numKeyedStateEntries())
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -175,7 +173,7 @@ class JoinHarnessTest extends HarnessTestBase{
         new TupleRowKeySelector[Integer](0),
         new TupleRowKeySelector[Integer](0),
         BasicTypeInfo.INT_TYPE_INFO,
-        1,1,0)
+        1, 1, 0)
 
     testHarness.open()
 
@@ -188,37 +186,38 @@ class JoinHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(3)
     testHarness.processElement1(new StreamRecord(
       CRow(Row.of(1: JInt, "aaa3"), true), 3))
-    assert(testHarness.numKeyedStateEntries() == 4)
-    assert(testHarness.numProcessingTimeTimers() == 2)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+    assertEquals(2, testHarness.numProcessingTimeTimers())
 
     // Do not store b elements
     // not meet a.proctime <= b.proctime - 5
     testHarness.processElement2(new StreamRecord(
       CRow(Row.of(1: JInt, "bbb3"), true), 3))
-    assert(testHarness.numKeyedStateEntries() == 4)
-    assert(testHarness.numProcessingTimeTimers() == 2)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+    assertEquals(2, testHarness.numProcessingTimeTimers())
 
     // meet a.proctime <= b.proctime - 5
     testHarness.setProcessingTime(7)
     testHarness.processElement2(new StreamRecord(
       CRow(Row.of(2: JInt, "bbb7"), true), 7))
-    assert(testHarness.numKeyedStateEntries() == 4)
-    assert(testHarness.numProcessingTimeTimers() == 2)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+    assertEquals(2, testHarness.numProcessingTimeTimers())
 
     // expire record of stream a at timestamp 1
     testHarness.setProcessingTime(12)
-    assert(testHarness.numKeyedStateEntries() == 4)
-    assert(testHarness.numProcessingTimeTimers() == 2)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+    assertEquals(2, testHarness.numProcessingTimeTimers())
     testHarness.processElement2(new StreamRecord(
       CRow(Row.of(1: JInt, "bbb12"), true), 12))
 
     testHarness.setProcessingTime(13)
-    assert(testHarness.numKeyedStateEntries() == 2)
-    assert(testHarness.numProcessingTimeTimers() == 1)
+    assertEquals(2, testHarness.numKeyedStateEntries())
+    assertEquals(1, testHarness.numProcessingTimeTimers())
 
-    testHarness.setProcessingTime(14)
-    assert(testHarness.numKeyedStateEntries() == 0)
-    assert(testHarness.numProcessingTimeTimers() == 0)
+    // state must be cleaned after the window timer interval has passed without new rows.
+    testHarness.setProcessingTime(23)
+    assertEquals(0, testHarness.numKeyedStateEntries())
+    assertEquals(0, testHarness.numProcessingTimeTimers())
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
new file mode 100644
index 0000000..ab7925b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+  /** test process time inner join **/
+  @Test
+  def testProcessTimeInnerJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
+      "t1.proctime between t2.proctime - interval '5' second and t2.proctime + interval '5' second"
+
+    val data1 = new mutable.MutableList[(Int, Long, String)]
+    data1.+=((1, 1L, "Hi1"))
+    data1.+=((1, 2L, "Hi2"))
+    data1.+=((1, 5L, "Hi3"))
+    data1.+=((2, 7L, "Hi5"))
+    data1.+=((1, 9L, "Hi6"))
+    data1.+=((1, 8L, "Hi8"))
+
+    val data2 = new mutable.MutableList[(Int, Long, String)]
+    data2.+=((1, 1L, "HiHi"))
+    data2.+=((2, 2L, "HeHe"))
+
+    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+  }
+
+  /** test process time inner join with other condition **/
+  @Test
+  def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
+      "t1.proctime between t2.proctime - interval '5' second " +
+      "and t2.proctime + interval '5' second " +
+      "and t1.b > t2.b and t1.b + t2.b < 14"
+
+    val data1 = new mutable.MutableList[(String, Long, String)]
+    data1.+=(("1", 1L, "Hi1"))
+    data1.+=(("1", 2L, "Hi2"))
+    data1.+=(("1", 5L, "Hi3"))
+    data1.+=(("2", 7L, "Hi5"))
+    data1.+=(("1", 9L, "Hi6"))
+    data1.+=(("1", 8L, "Hi8"))
+
+    val data2 = new mutable.MutableList[(String, Long, String)]
+    data2.+=(("1", 5L, "HiHi"))
+    data2.+=(("2", 2L, "HeHe"))
+
+    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+  }
+
+}
+