You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/07/18 07:40:07 UTC
[2/3] flink git commit: [FLINK-6232] [table] Add SQL documentation
for time window join.
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
deleted file mode 100644
index 15e8b89..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.calcite.rel.logical.LogicalJoin
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.table.runtime.join.WindowJoinUtil
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.Assert._
-import org.junit.Test
-
-class JoinTest extends TableTestBase {
- private val streamUtil: StreamTableTestUtil = streamTestUtil()
- streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
- streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
-
- @Test
- def testProcessingTimeInnerJoin() = {
-
- val sqlQuery = "SELECT t1.a, t2.b " +
- "FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " +
- "t1.proctime between t2.proctime - interval '1' hour and t2.proctime + interval '1' hour"
- val expected =
- unaryNode(
- "DataStreamCalc",
- binaryNode(
- "DataStreamWindowJoin",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "proctime")
- ),
- unaryNode(
- "DataStreamCalc",
- streamTableNode(1),
- term("select", "a", "b", "proctime")
- ),
- term("where",
- "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
- "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
- term("join", "a, proctime, a0, b, proctime0"),
- term("joinType", "InnerJoin")
- ),
- term("select", "a", "b")
- )
-
- streamUtil.verifySql(sqlQuery, expected)
- }
-
- /** There should exist time conditions **/
- @Test(expected = classOf[TableException])
- def testWindowJoinUnExistTimeCondition() = {
- val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a"
- streamUtil.verifySql(sql, "n/a")
- }
-
- /** There should exist exactly two time conditions **/
- @Test(expected = classOf[TableException])
- def testWindowJoinSingleTimeCondition() = {
- val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
- " and t1.proctime > t2.proctime - interval '5' second"
- streamUtil.verifySql(sql, "n/a")
- }
-
- /** Both time attributes in a join condition must be of the same type **/
- @Test(expected = classOf[TableException])
- def testWindowJoinDiffTimeIndicator() = {
- val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
- " and t1.proctime > t2.proctime - interval '5' second " +
- " and t1.proctime < t2.c + interval '5' second"
- streamUtil.verifySql(sql, "n/a")
- }
-
- /** The time conditions should be an And condition **/
- @Test(expected = classOf[TableException])
- def testWindowJoinNotCnfCondition() = {
- val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
- " and (t1.proctime > t2.proctime - interval '5' second " +
- " or t1.proctime < t2.c + interval '5' second)"
- streamUtil.verifySql(sql, "n/a")
- }
-
- @Test
- def testJoinTimeBoundary(): Unit = {
- verifyTimeBoundary(
- "t1.proctime between t2.proctime - interval '1' hour " +
- "and t2.proctime + interval '1' hour",
- -3600000,
- 3600000,
- "proctime")
-
- verifyTimeBoundary(
- "t1.proctime > t2.proctime - interval '1' second and " +
- "t1.proctime < t2.proctime + interval '1' second",
- -999,
- 999,
- "proctime")
-
- verifyTimeBoundary(
- "t1.c >= t2.c - interval '1' second and " +
- "t1.c <= t2.c + interval '1' second",
- -1000,
- 1000,
- "rowtime")
-
- verifyTimeBoundary(
- "t1.c >= t2.c and " +
- "t1.c <= t2.c + interval '1' second",
- 0,
- 1000,
- "rowtime")
-
- verifyTimeBoundary(
- "t1.c >= t2.c + interval '1' second and " +
- "t1.c <= t2.c + interval '10' second",
- 1000,
- 10000,
- "rowtime")
-
- verifyTimeBoundary(
- "t2.c - interval '1' second <= t1.c and " +
- "t2.c + interval '10' second >= t1.c",
- -1000,
- 10000,
- "rowtime")
-
- verifyTimeBoundary(
- "t1.c - interval '2' second >= t2.c + interval '1' second -" +
- "interval '10' second and " +
- "t1.c <= t2.c + interval '10' second",
- -7000,
- 10000,
- "rowtime")
-
- verifyTimeBoundary(
- "t1.c >= t2.c - interval '10' second and " +
- "t1.c <= t2.c - interval '5' second",
- -10000,
- -5000,
- "rowtime")
- }
-
- @Test
- def testJoinRemainConditionConvert(): Unit = {
- streamUtil.addTable[(Int, Long, Int)]("MyTable3", 'a, 'b.rowtime, 'c, 'proctime.proctime)
- streamUtil.addTable[(Int, Long, Int)]("MyTable4", 'a, 'b.rowtime, 'c, 'proctime.proctime)
- val query =
- "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
- "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second and " +
- "t1.c > t2.c"
- verifyRemainConditionConvert(
- query,
- ">($1, $3)")
-
- val query1 =
- "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
- "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second "
- verifyRemainConditionConvert(
- query1,
- "")
-
- streamUtil.addTable[(Int, Long, Int)]("MyTable5", 'a, 'b, 'c, 'proctime.proctime)
- streamUtil.addTable[(Int, Long, Int)]("MyTable6", 'a, 'b, 'c, 'proctime.proctime)
- val query2 =
- "SELECT t1.a, t2.c FROM MyTable5 as t1 join MyTable6 as t2 on t1.a = t2.a and " +
- "t1.proctime >= t2.proctime - interval '10' second " +
- "and t1.proctime <= t2.proctime - interval '5' second and " +
- "t1.c > t2.c"
- verifyRemainConditionConvert(
- query2,
- ">($2, $5)")
- }
-
- def verifyTimeBoundary(
- timeSql: String,
- expLeftSize: Long,
- expRightSize: Long,
- expTimeType: String) = {
- val query =
- "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
-
- val resultTable = streamUtil.tEnv.sql(query)
- val relNode = resultTable.getRelNode
- val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
- val rexNode = joinNode.getCondition
- val (isRowTime, lowerBound, upperBound, conditionWithoutTime) =
- WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType),
- joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
-
- val timeTypeStr =
- if (isRowTime) "rowtime"
- else "proctime"
- assertEquals(expLeftSize, lowerBound)
- assertEquals(expRightSize, upperBound)
- assertEquals(expTimeType, timeTypeStr)
- }
-
- def verifyRemainConditionConvert(
- query: String,
- expectCondStr: String) = {
-
- val resultTable = streamUtil.tEnv.sql(query)
- val relNode = resultTable.getRelNode
- val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
- val joinInfo = joinNode.analyzeCondition
- val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
- val (isRowTime, lowerBound, upperBound, remainCondition) =
- WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType),
- joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
-
- val actual: String = remainCondition.getOrElse("").toString
-
- assertEquals(expectCondStr, actual)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
new file mode 100644
index 0000000..640fd26
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.join.WindowJoinUtil
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Assert._
+import org.junit.Test
+
+class JoinTest extends TableTestBase {
+ private val streamUtil: StreamTableTestUtil = streamTestUtil()
+ streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+ streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+
+ @Test
+ def testProcessingTimeInnerJoinWithOnClause() = {
+
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1 JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+ |""".stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "proctime")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "proctime")
+ ),
+ term("where",
+ "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+ "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+ term("join", "a, proctime, a0, b, proctime0"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testProcessingTimeInnerJoinWithWhereClause() = {
+
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1, MyTable2 t2
+ |WHERE t1.a = t2.a AND
+ | t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+ |""".stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "proctime")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "proctime")
+ ),
+ term("where",
+ "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+ "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+ term("join", "a, proctime, a0, b, proctime0"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b0 AS b")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testJoinTimeBoundary(): Unit = {
+ verifyTimeBoundary(
+ "t1.proctime between t2.proctime - interval '1' hour " +
+ "and t2.proctime + interval '1' hour",
+ -3600000,
+ 3600000,
+ "proctime")
+
+ verifyTimeBoundary(
+ "t1.proctime > t2.proctime - interval '1' second and " +
+ "t1.proctime < t2.proctime + interval '1' second",
+ -999,
+ 999,
+ "proctime")
+
+ verifyTimeBoundary(
+ "t1.c >= t2.c - interval '1' second and " +
+ "t1.c <= t2.c + interval '1' second",
+ -1000,
+ 1000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c >= t2.c and " +
+ "t1.c <= t2.c + interval '1' second",
+ 0,
+ 1000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c >= t2.c + interval '1' second and " +
+ "t1.c <= t2.c + interval '10' second",
+ 1000,
+ 10000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t2.c - interval '1' second <= t1.c and " +
+ "t2.c + interval '10' second >= t1.c",
+ -1000,
+ 10000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c - interval '2' second >= t2.c + interval '1' second -" +
+ "interval '10' second and " +
+ "t1.c <= t2.c + interval '10' second",
+ -7000,
+ 10000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c >= t2.c - interval '10' second and " +
+ "t1.c <= t2.c - interval '5' second",
+ -10000,
+ -5000,
+ "rowtime")
+ }
+
+ @Test
+ def testJoinRemainConditionConvert(): Unit = {
+ streamUtil.addTable[(Int, Long, Int)]("MyTable3", 'a, 'b.rowtime, 'c, 'proctime.proctime)
+ streamUtil.addTable[(Int, Long, Int)]("MyTable4", 'a, 'b.rowtime, 'c, 'proctime.proctime)
+ val query =
+ "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
+ "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second and " +
+ "t1.c > t2.c"
+ verifyRemainConditionConvert(
+ query,
+ ">($2, $6)")
+
+ val query1 =
+ "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
+ "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second "
+ verifyRemainConditionConvert(
+ query1,
+ "")
+
+ streamUtil.addTable[(Int, Long, Int)]("MyTable5", 'a, 'b, 'c, 'proctime.proctime)
+ streamUtil.addTable[(Int, Long, Int)]("MyTable6", 'a, 'b, 'c, 'proctime.proctime)
+ val query2 =
+ "SELECT t1.a, t2.c FROM MyTable5 as t1 join MyTable6 as t2 on t1.a = t2.a and " +
+ "t1.proctime >= t2.proctime - interval '10' second " +
+ "and t1.proctime <= t2.proctime - interval '5' second and " +
+ "t1.c > t2.c"
+ verifyRemainConditionConvert(
+ query2,
+ ">($2, $6)")
+ }
+
+ private def verifyTimeBoundary(
+ timeSql: String,
+ expLeftSize: Long,
+ expRightSize: Long,
+ expTimeType: String): Unit = {
+ val query =
+ "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
+
+ val resultTable = streamUtil.tableEnv.sql(query)
+ val relNode = resultTable.getRelNode
+ val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+ val rexNode = joinNode.getCondition
+ val (windowBounds, _) =
+ WindowJoinUtil.extractWindowBoundsFromPredicate(
+ rexNode,
+ 4,
+ joinNode.getRowType,
+ joinNode.getCluster.getRexBuilder,
+ streamUtil.tableEnv.getConfig)
+
+ val timeTypeStr =
+ if (windowBounds.get.isEventTime) "rowtime"
+ else "proctime"
+ assertEquals(expLeftSize, windowBounds.get.leftLowerBound)
+ assertEquals(expRightSize, windowBounds.get.leftUpperBound)
+ assertEquals(expTimeType, timeTypeStr)
+ }
+
+ private def verifyRemainConditionConvert(
+ query: String,
+ expectCondStr: String): Unit = {
+
+ val resultTable = streamUtil.tableEnv.sql(query)
+ val relNode = resultTable.getRelNode
+ val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+ val joinInfo = joinNode.analyzeCondition
+ val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
+ val (_, remainCondition) =
+ WindowJoinUtil.extractWindowBoundsFromPredicate(
+ rexNode,
+ 4,
+ joinNode.getRowType,
+ joinNode.getCluster.getRexBuilder,
+ streamUtil.tableEnv.getConfig)
+
+ val actual: String = remainCondition.getOrElse("").toString
+
+ assertEquals(expectCondStr, actual)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
new file mode 100644
index 0000000..9cce37e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class JoinValidationTest extends TableTestBase {
+
+ private val streamUtil: StreamTableTestUtil = streamTestUtil()
+ streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+ streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+
+ /** There should exist time conditions **/
+ @Test(expected = classOf[TableException])
+ def testWindowJoinUnExistTimeCondition() = {
+ val sql =
+ """
+ |SELECT t2.a
+ |FROM MyTable t1 JOIN MyTable2 t2 ON t1.a = t2.a""".stripMargin
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ /** There should exist exactly two time conditions **/
+ @Test(expected = classOf[TableException])
+ def testWindowJoinSingleTimeCondition() = {
+ val sql =
+ """
+ |SELECT t2.a
+ |FROM MyTable t1 JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | t1.proctime > t2.proctime - INTERVAL '5' SECOND""".stripMargin
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ /** Both time attributes in a join condition must be of the same type **/
+ @Test(expected = classOf[TableException])
+ def testWindowJoinDiffTimeIndicator() = {
+ val sql =
+ """
+ |SELECT t2.a FROM
+ |MyTable t1 JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | t1.proctime > t2.proctime - INTERVAL '5' SECOND AND
+ | t1.proctime < t2.c + INTERVAL '5' SECOND""".stripMargin
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ /** The time conditions should be an And condition **/
+ @Test(expected = classOf[TableException])
+ def testWindowJoinNotCnfCondition() = {
+ val sql =
+ """
+ |SELECT t2.a
+ |FROM MyTable t1 JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | (t1.proctime > t2.proctime - INTERVAL '5' SECOND OR
+ | t1.proctime < t2.c + INTERVAL '5' SECOND)""".stripMargin
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ /** Validates that no rowtime attribute is in the output schema **/
+ @Test(expected = classOf[TableException])
+ def testNoRowtimeAttributeInResult(): Unit = {
+ val sql =
+ """
+ |SELECT *
+ |FROM MyTable t1, MyTable2 t2
+ |WHERE t1.a = t2.a AND
+ | t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND t2.proctime
+ | """.stripMargin
+
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 7885160..90c8ea4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -85,7 +85,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
"DataStreamCalc",
streamTableNode(0),
term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"),
- term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)")
+ term("where", ">(rowtime, 1990-12-02 12:11:11)")
)
util.verifyTable(result, expected)
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index c008ed3..6c24c5d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -20,20 +20,18 @@ package org.apache.flink.table.runtime.harness
import java.util.concurrent.ConcurrentLinkedQueue
import java.lang.{Integer => JInt}
-import org.apache.flink.api.common.functions.FlatJoinFunction
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
-import org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, TwoInputStreamOperatorTestHarness}
-import org.apache.flink.table.codegen.GeneratedFunction
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, TupleRowKeySelector}
import org.apache.flink.table.runtime.join.ProcTimeWindowInnerJoin
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.types.Row
import org.junit.Test
-
+import org.junit.Assert.{assertEquals, assertTrue}
class JoinHarnessTest extends HarnessTestBase{
@@ -89,7 +87,7 @@ class JoinHarnessTest extends HarnessTestBase{
new TupleRowKeySelector[Integer](0),
new TupleRowKeySelector[Integer](0),
BasicTypeInfo.INT_TYPE_INFO,
- 1,1,0)
+ 1, 1, 0)
testHarness.open()
@@ -97,16 +95,16 @@ class JoinHarnessTest extends HarnessTestBase{
testHarness.setProcessingTime(1)
testHarness.processElement1(new StreamRecord(
CRow(Row.of(1: JInt, "aaa"), true), 1))
- assert(testHarness.numProcessingTimeTimers() == 1)
+ assertEquals(1, testHarness.numProcessingTimeTimers())
testHarness.setProcessingTime(2)
testHarness.processElement1(new StreamRecord(
CRow(Row.of(2: JInt, "bbb"), true), 2))
- assert(testHarness.numProcessingTimeTimers() == 2)
+ assertEquals(2, testHarness.numProcessingTimeTimers())
testHarness.setProcessingTime(3)
testHarness.processElement1(new StreamRecord(
CRow(Row.of(1: JInt, "aaa2"), true), 3))
- assert(testHarness.numKeyedStateEntries() == 4)
- assert(testHarness.numProcessingTimeTimers() == 2)
+ assertEquals(4, testHarness.numKeyedStateEntries())
+ assertEquals(2, testHarness.numProcessingTimeTimers())
// right stream input and output normally
testHarness.processElement2(new StreamRecord(
@@ -114,20 +112,20 @@ class JoinHarnessTest extends HarnessTestBase{
testHarness.setProcessingTime(4)
testHarness.processElement2(new StreamRecord(
CRow(Row.of(2: JInt, "Hello1"), true), 4))
- assert(testHarness.numKeyedStateEntries() == 8)
- assert(testHarness.numProcessingTimeTimers() == 4)
+ assertEquals(8, testHarness.numKeyedStateEntries())
+ assertEquals(4, testHarness.numProcessingTimeTimers())
// expired left stream record at timestamp 1
testHarness.setProcessingTime(12)
- assert(testHarness.numKeyedStateEntries() == 8)
- assert(testHarness.numProcessingTimeTimers() == 4)
+ assertEquals(8, testHarness.numKeyedStateEntries())
+ assertEquals(4, testHarness.numProcessingTimeTimers())
testHarness.processElement2(new StreamRecord(
CRow(Row.of(1: JInt, "Hi2"), true), 12))
// expired right stream record at timestamp 4 and all left stream
testHarness.setProcessingTime(25)
- assert(testHarness.numKeyedStateEntries() == 2)
- assert(testHarness.numProcessingTimeTimers() == 1)
+ assertEquals(2, testHarness.numKeyedStateEntries())
+ assertEquals(1, testHarness.numProcessingTimeTimers())
testHarness.processElement1(new StreamRecord(
CRow(Row.of(1: JInt, "aaa3"), true), 25))
testHarness.processElement1(new StreamRecord(
@@ -136,9 +134,9 @@ class JoinHarnessTest extends HarnessTestBase{
CRow(Row.of(2: JInt, "Hello2"), true), 25))
testHarness.setProcessingTime(45)
- assert(testHarness.numKeyedStateEntries() > 0)
+ assertTrue(testHarness.numKeyedStateEntries() > 0)
testHarness.setProcessingTime(46)
- assert(testHarness.numKeyedStateEntries() == 0)
+ assertEquals(0, testHarness.numKeyedStateEntries())
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -175,7 +173,7 @@ class JoinHarnessTest extends HarnessTestBase{
new TupleRowKeySelector[Integer](0),
new TupleRowKeySelector[Integer](0),
BasicTypeInfo.INT_TYPE_INFO,
- 1,1,0)
+ 1, 1, 0)
testHarness.open()
@@ -188,37 +186,38 @@ class JoinHarnessTest extends HarnessTestBase{
testHarness.setProcessingTime(3)
testHarness.processElement1(new StreamRecord(
CRow(Row.of(1: JInt, "aaa3"), true), 3))
- assert(testHarness.numKeyedStateEntries() == 4)
- assert(testHarness.numProcessingTimeTimers() == 2)
+ assertEquals(4, testHarness.numKeyedStateEntries())
+ assertEquals(2, testHarness.numProcessingTimeTimers())
// Do not store b elements
// not meet a.proctime <= b.proctime - 5
testHarness.processElement2(new StreamRecord(
CRow(Row.of(1: JInt, "bbb3"), true), 3))
- assert(testHarness.numKeyedStateEntries() == 4)
- assert(testHarness.numProcessingTimeTimers() == 2)
+ assertEquals(4, testHarness.numKeyedStateEntries())
+ assertEquals(2, testHarness.numProcessingTimeTimers())
// meet a.proctime <= b.proctime - 5
testHarness.setProcessingTime(7)
testHarness.processElement2(new StreamRecord(
CRow(Row.of(2: JInt, "bbb7"), true), 7))
- assert(testHarness.numKeyedStateEntries() == 4)
- assert(testHarness.numProcessingTimeTimers() == 2)
+ assertEquals(4, testHarness.numKeyedStateEntries())
+ assertEquals(2, testHarness.numProcessingTimeTimers())
// expire record of stream a at timestamp 1
testHarness.setProcessingTime(12)
- assert(testHarness.numKeyedStateEntries() == 4)
- assert(testHarness.numProcessingTimeTimers() == 2)
+ assertEquals(4, testHarness.numKeyedStateEntries())
+ assertEquals(2, testHarness.numProcessingTimeTimers())
testHarness.processElement2(new StreamRecord(
CRow(Row.of(1: JInt, "bbb12"), true), 12))
testHarness.setProcessingTime(13)
- assert(testHarness.numKeyedStateEntries() == 2)
- assert(testHarness.numProcessingTimeTimers() == 1)
+ assertEquals(2, testHarness.numKeyedStateEntries())
+ assertEquals(1, testHarness.numProcessingTimeTimers())
- testHarness.setProcessingTime(14)
- assert(testHarness.numKeyedStateEntries() == 0)
- assert(testHarness.numProcessingTimeTimers() == 0)
+ // state must be cleaned after the window timer interval has passed without new rows.
+ testHarness.setProcessingTime(23)
+ assertEquals(0, testHarness.numKeyedStateEntries())
+ assertEquals(0, testHarness.numProcessingTimeTimers())
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
new file mode 100644
index 0000000..ab7925b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+ /** test process time inner join **/
+ @Test
+ def testProcessTimeInnerJoin(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
+ "t1.proctime between t2.proctime - interval '5' second and t2.proctime + interval '5' second"
+
+ val data1 = new mutable.MutableList[(Int, Long, String)]
+ data1.+=((1, 1L, "Hi1"))
+ data1.+=((1, 2L, "Hi2"))
+ data1.+=((1, 5L, "Hi3"))
+ data1.+=((2, 7L, "Hi5"))
+ data1.+=((1, 9L, "Hi6"))
+ data1.+=((1, 8L, "Hi8"))
+
+ val data2 = new mutable.MutableList[(Int, Long, String)]
+ data2.+=((1, 1L, "HiHi"))
+ data2.+=((2, 2L, "HeHe"))
+
+ val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+ val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ }
+
+ /** test process time inner join with other condition **/
+ @Test
+ def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
+ "t1.proctime between t2.proctime - interval '5' second " +
+ "and t2.proctime + interval '5' second " +
+ "and t1.b > t2.b and t1.b + t2.b < 14"
+
+ val data1 = new mutable.MutableList[(String, Long, String)]
+ data1.+=(("1", 1L, "Hi1"))
+ data1.+=(("1", 2L, "Hi2"))
+ data1.+=(("1", 5L, "Hi3"))
+ data1.+=(("2", 7L, "Hi5"))
+ data1.+=(("1", 9L, "Hi6"))
+ data1.+=(("1", 8L, "Hi8"))
+
+ val data2 = new mutable.MutableList[(String, Long, String)]
+ data2.+=(("1", 5L, "HiHi"))
+ data2.+=(("2", 2L, "HeHe"))
+
+ val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+ val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ }
+
+}
+