You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/10/16 16:33:49 UTC
flink git commit: [FLINK-7798] [table] Add support for stream
time-windowed inner join to Table API
Repository: flink
Updated Branches:
refs/heads/master 861c57cb1 -> e79cedf23
[FLINK-7798] [table] Add support for stream time-windowed inner join to Table API
This closes #4825.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e79cedf2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e79cedf2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e79cedf2
Branch: refs/heads/master
Commit: e79cedf23b42a67f28c23c558a4b0be2179aba2d
Parents: 861c57c
Author: Xingcan Cui <xi...@gmail.com>
Authored: Sat Oct 14 02:25:47 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Oct 16 18:18:51 2017 +0200
----------------------------------------------------------------------
docs/dev/table/sql.md | 1 -
docs/dev/table/tableApi.md | 100 +++++++--------
.../flink/table/plan/logical/operators.scala | 5 -
.../flink/table/api/stream/table/JoinTest.scala | 127 +++++++++++++++++++
.../table/validation/JoinValidationTest.scala | 95 ++++++++++++++
.../table/runtime/stream/sql/JoinITCase.scala | 80 +++++++++---
6 files changed, 335 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 81dabee..8591910 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -405,7 +405,6 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
<li>Time predicates must compare time attributes of both input tables.</li>
<li>Time predicates must compare only time attributes of the same type, i.e., processing time with processing time or event time with event time.</li>
<li>Only range predicates are valid time predicates.</li>
- <li>Non-time predicates must not access a time attribute.</li>
</ul>
</p>
http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 0a2acab..2294300 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -464,6 +464,7 @@ val result: Table = orders
val orders: Table = tableEnv.scan("Orders")
val result = orders.distinct()
{% endhighlight %}
+ <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
</td>
</tr>
</tbody>
@@ -503,45 +504,45 @@ Table result = left.join(right).where("a = d").select("a, b, e");
<tr>
<td>
- <strong>Left Outer Join</strong><br>
+ <strong>Outer Joins</strong><br>
<span class="label label-primary">Batch</span>
</td>
<td>
- <p>Similar to a SQL LEFT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
+ <p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
{% highlight java %}
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
-Table result = left.leftOuterJoin(right, "a = d").select("a, b, e");
-{% endhighlight %}
- </td>
- </tr>
- <tr>
- <td>
- <strong>Right Outer Join</strong><br>
- <span class="label label-primary">Batch</span>
- </td>
- <td>
- <p>Similar to a SQL RIGHT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
-{% highlight java %}
-Table left = tableEnv.fromDataSet(ds1, "a, b, c");
-Table right = tableEnv.fromDataSet(ds2, "d, e, f");
-Table result = left.rightOuterJoin(right, "a = d").select("a, b, e");
+Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");
+Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");
+Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
{% endhighlight %}
</td>
</tr>
-
<tr>
- <td>
- <strong>Full Outer Join</strong><br>
+ <td><strong>Time-windowed Join</strong><br>
<span class="label label-primary">Batch</span>
+ <span class="label label-primary">Streaming</span>
</td>
<td>
- <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
+ <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
+
+ <p>A time-windowed join requires an equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code> <, <=, >=, ></code>) that compare the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+ <ul>
+ <li>The time attribute of a stream must be compared to a bounded interval on a time attribute of the opposite stream.</li>
+ <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
+ </ul>
+ </p>
+
+ <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
+
{% highlight java %}
-Table left = tableEnv.fromDataSet(ds1, "a, b, c");
-Table right = tableEnv.fromDataSet(ds2, "d, e, f");
-Table result = left.fullOuterJoin(right, "a = d").select("a, b, e");
+Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
+Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");
+
+Table result = left.join(right)
+ .where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes")
+ .select("a, b, e, ltime");
{% endhighlight %}
</td>
</tr>
@@ -609,7 +610,7 @@ Table result = orders
<span class="label label-primary">Batch</span>
</td>
<td>
- <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and an equality join predicate must be defined using a where or filter operator.</p>
+ <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.</p>
{% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
val right = ds2.toTable(tableEnv, 'd, 'e, 'f);
@@ -617,48 +618,47 @@ val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
{% endhighlight %}
</td>
</tr>
-
<tr>
<td>
- <strong>Left Outer Join</strong><br>
+ <strong>Outer Joins</strong><br>
<span class="label label-primary">Batch</span>
</td>
<td>
- <p>Similar to a SQL LEFT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
+ <p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
{% highlight scala %}
val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
-val result = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
-{% endhighlight %}
- </td>
- </tr>
- <tr>
- <td>
- <strong>Right Outer Join</strong><br>
- <span class="label label-primary">Batch</span>
- </td>
- <td>
- <p>Similar to a SQL RIGHT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
-{% highlight scala %}
-val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
-val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
-val result = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
+val leftOuterResult = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
+val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
+val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
{% endhighlight %}
</td>
</tr>
-
<tr>
- <td>
- <strong>Full Outer Join</strong><br>
+ <td><strong>Time-windowed Join</strong><br>
<span class="label label-primary">Batch</span>
+ <span class="label label-primary">Streaming</span>
</td>
<td>
- <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
+ <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
+
+ <p>A time-windowed join requires an equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code> <, <=, >=, ></code>) that compare the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+ <ul>
+ <li>The time attribute of a stream must be compared to a bounded interval on a time attribute of the opposite stream.</li>
+ <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
+ </ul>
+ </p>
+
+ <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
+
{% highlight scala %}
-val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
-val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
-val result = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime);
+val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime);
+
+val result = left.join(right)
+ .where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 10.minutes)
+ .select('a, 'b, 'e, 'ltime);
{% endhighlight %}
</td>
</tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 0c8efd7..ab72d47 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -429,11 +429,6 @@ case class Join(
left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet)
override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment]
- && !right.isInstanceOf[LogicalTableFunctionCall]) {
- failValidation(s"Join on stream tables is currently not supported.")
- }
-
val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
failValidation(s"Filter operator requires a boolean expression as input, " +
http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
new file mode 100644
index 0000000..07e879f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+ * Currently only time-windowed inner joins can be processed in a streaming fashion.
+ */
+class JoinTest extends TableTestBase {
+
+ @Test
+ def testRowTimeWindowInnerJoin(): Unit = {
+ val util = streamTestUtil()
+ val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.rowtime)
+ val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime)
+
+ val resultTable = left.join(right)
+ .where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 3.seconds)
+ .select('a, 'e, 'ltime)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "ltime")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "d", "e", "rtime")
+ ),
+ term("where", "AND(=(a, d), >=(ltime, -(rtime, 300000))," +
+ " <(ltime, DATETIME_PLUS(rtime, 3000)))"),
+ term("join", "a", "ltime", "d", "e", "rtime"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "e", "ltime")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testProcTimeWindowInnerJoin(): Unit = {
+ val util = streamTestUtil()
+ val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.proctime)
+ val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.proctime)
+
+ val resultTable = left.join(right)
+ .where('a === 'd && 'ltime >= 'rtime - 1.second && 'ltime < 'rtime)
+ .select('a, 'e, 'ltime)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "ltime")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "d", "e", "rtime")
+ ),
+ term("where", "AND(=(a, d), >=(ltime, -(rtime, 1000)), <(ltime, rtime))"),
+ term("join", "a", "ltime", "d", "e", "rtime"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "e", "PROCTIME(ltime) AS ltime")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+ /**
+ * The time indicator can be accessed from non-time predicates now.
+ */
+ @Test
+ def testInnerJoinWithTimeIndicatorAccessed(): Unit = {
+ val util = streamTestUtil()
+ val left = util.addTable[(Long, Int, Timestamp)]('a, 'b, 'c, 'ltime.rowtime)
+ val right = util.addTable[(Long, Int, Timestamp)]('d, 'e, 'f, 'rtime.rowtime)
+
+ val resultTable = left.join(right)
+ .where('a ==='d && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime && 'ltime > 'f)
+
+ val expected =
+ binaryNode(
+ "DataStreamWindowJoin",
+ streamTableNode(0),
+ streamTableNode(1),
+ term("where", "AND(=(a, d), >=(ltime, -(rtime, 300000)), <(ltime, rtime), >(ltime, f))"),
+ term("join", "a", "b", "c", "ltime", "d", "e", "f", "rtime"),
+ term("joinType", "InnerJoin")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala
new file mode 100644
index 0000000..e924e6e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+/**
+ * Currently only time-windowed inner joins can be processed in a streaming fashion.
+ */
+class JoinValidationTest extends TableTestBase {
+
+ /**
+ * At least one equi-join predicate required.
+ */
+ @Test(expected = classOf[TableException])
+ def testInnerJoinWithoutEquiPredicate(): Unit = {
+ val util = streamTestUtil()
+ val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.rowtime)
+ val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime)
+
+ val resultTable = left.join(right)
+ .where('ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 3.seconds)
+ .select('a, 'e, 'ltime)
+
+ val expected = ""
+ util.verifyTable(resultTable, expected)
+ }
+
+ /**
+ * There must be complete window-bounds.
+ */
+ @Test(expected = classOf[TableException])
+ def testInnerJoinWithIncompleteWindowBounds1(): Unit = {
+ val util = streamTestUtil()
+ val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.rowtime)
+ val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime)
+
+ val resultTable = left.join(right)
+ .where('a ==='d && 'ltime >= 'rtime - 5.minutes && 'ltime < 'ltime + 3.seconds)
+ .select('a, 'e, 'ltime)
+
+ util.verifyTable(resultTable, "")
+ }
+
+ /**
+ * There must be complete window-bounds.
+ */
+ @Test(expected = classOf[TableException])
+ def testInnerJoinWithIncompleteWindowBounds2(): Unit = {
+ val util = streamTestUtil()
+ val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.rowtime)
+ val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime)
+
+ val resultTable = left.join(right)
+ .where('a ==='d && 'ltime >= 'rtime - 5.minutes && 'ltime > 'rtime + 3.seconds)
+ .select('a, 'e, 'ltime)
+
+ util.verifyTable(resultTable, "")
+ }
+
+ /**
+ * Time indicators for the two tables must be identical.
+ */
+ @Test(expected = classOf[TableException])
+ def testInnerJoinWithDifferentTimeIndicators(): Unit = {
+ val util = streamTestUtil()
+ val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.proctime)
+ val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime)
+
+ val resultTable = left.join(right)
+ .where('a ==='d && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 3.seconds)
+
+ util.verifyTable(resultTable, "")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index 015a5a2..119f92f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -18,6 +18,8 @@
package org.apache.flink.table.runtime.stream.sql
+import java.util
+
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
@@ -27,7 +29,6 @@ import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
import org.apache.flink.types.Row
-import org.hamcrest.CoreMatchers
import org.junit._
import scala.collection.mutable
@@ -238,22 +239,67 @@ class JoinITCase extends StreamingWithStateTestBase {
env.execute()
// There may be two expected results according to the process order.
- val expected1 = new mutable.MutableList[String]
- expected1+= "1,LEFT3,RIGHT6"
- expected1+= "1,LEFT1.1,RIGHT6"
- expected1+= "2,LEFT4,RIGHT7"
- expected1+= "1,LEFT4.9,RIGHT6"
-
- val expected2 = new mutable.MutableList[String]
- expected2+= "1,LEFT3,RIGHT6"
- expected2+= "1,LEFT1.1,RIGHT6"
- expected2+= "2,LEFT4,RIGHT7"
- expected2+= "1,LEFT4.9,RIGHT6"
-
- Assert.assertThat(
- StreamITCase.testResults.sorted,
- CoreMatchers.either(CoreMatchers.is(expected1.sorted)).
- or(CoreMatchers.is(expected2.sorted)))
+ val expected = new util.ArrayList[String]
+ expected.add("1,LEFT3,RIGHT6")
+ expected.add("1,LEFT1.1,RIGHT6")
+ expected.add("2,LEFT4,RIGHT7")
+ expected.add("1,LEFT4.9,RIGHT6")
+ StreamITCase.compareWithList(expected)
+ }
+
+ /** test rowtime inner join with another time condition **/
+ @Test
+ def testRowTimeInnerJoinWithOtherTimeCondition(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ StreamITCase.clear
+
+ val sqlQuery =
+ """
+ |SELECT t2.a, t1.c, t2.c
+ |FROM T1 as t1 JOIN T2 as t2 ON
+ | t1.a = t2.a AND
+ | t1.rt > t2.rt - INTERVAL '4' SECOND AND
+ | t1.rt < t2.rt AND
+ | QUARTER(t1.rt) = t2.a
+ |""".stripMargin
+
+ val data1 = new mutable.MutableList[(Int, Long, String, Long)]
+ data1.+=((1, 4L, "LEFT1", 1000L))
+ data1.+=((1, 2L, "LEFT2", 2000L))
+ data1.+=((1, 7L, "LEFT3", 3000L))
+ data1.+=((2, 5L, "LEFT4", 4000L))
+ data1.+=((1, 4L, "LEFT5", 5000L))
+ data1.+=((1, 10L, "LEFT6", 6000L))
+
+ val data2 = new mutable.MutableList[(Int, Long, String, Long)]
+ data2.+=((1, 1L, "RIGHT1", 1000L))
+ data2.+=((1, 9L, "RIGHT6", 6000L))
+ data2.+=((2, 8, "RIGHT7", 7000L))
+ data2.+=((1, 4L, "RIGHT8", 8000L))
+
+ val t1 = env.fromCollection(data1)
+ .assignTimestampsAndWatermarks(new Row4WatermarkExtractor)
+ .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime)
+ val t2 = env.fromCollection(data2)
+ .assignTimestampsAndWatermarks(new Row4WatermarkExtractor)
+ .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = new java.util.ArrayList[String]
+ expected.add("1,LEFT3,RIGHT6")
+ expected.add("1,LEFT5,RIGHT6")
+ expected.add("1,LEFT5,RIGHT8")
+ expected.add("1,LEFT6,RIGHT8")
+ StreamITCase.compareWithList(expected)
}
/** test rowtime inner join with window aggregation **/