You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/10/16 16:33:49 UTC

flink git commit: [FLINK-7798] [table] Add support for stream time-windowed inner join to Table API

Repository: flink
Updated Branches:
  refs/heads/master 861c57cb1 -> e79cedf23


[FLINK-7798] [table] Add support for stream time-windowed inner join to Table API

This closes #4825.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e79cedf2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e79cedf2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e79cedf2

Branch: refs/heads/master
Commit: e79cedf23b42a67f28c23c558a4b0be2179aba2d
Parents: 861c57c
Author: Xingcan Cui <xi...@gmail.com>
Authored: Sat Oct 14 02:25:47 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Oct 16 18:18:51 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |   1 -
 docs/dev/table/tableApi.md                      | 100 +++++++--------
 .../flink/table/plan/logical/operators.scala    |   5 -
 .../flink/table/api/stream/table/JoinTest.scala | 127 +++++++++++++++++++
 .../table/validation/JoinValidationTest.scala   |  95 ++++++++++++++
 .../table/runtime/stream/sql/JoinITCase.scala   |  80 +++++++++---
 6 files changed, 335 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 81dabee..8591910 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -405,7 +405,6 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
             <li>Time predicates must compare time attributes of both input tables.</li>
             <li>Time predicates must compare only time attributes of the same type, i.e., processing time with processing time or event time with event time.</li>
             <li>Only range predicates are valid time predicates.</li>
-            <li>Non-time predicates must not access a time attribute.</li>
           </ul>
         </p>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 0a2acab..2294300 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -464,6 +464,7 @@ val result: Table = orders
 val orders: Table = tableEnv.scan("Orders")
 val result = orders.distinct()
 {% endhighlight %}
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
   </tbody>
@@ -503,45 +504,45 @@ Table result = left.join(right).where("a = d").select("a, b, e");
 
     <tr>
       <td>
-        <strong>Left Outer Join</strong><br>
+        <strong>Outer Joins</strong><br>
         <span class="label label-primary">Batch</span>
       </td>
       <td>
-        <p>Similar to a SQL LEFT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
+        <p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
 {% highlight java %}
 Table left = tableEnv.fromDataSet(ds1, "a, b, c");
 Table right = tableEnv.fromDataSet(ds2, "d, e, f");
-Table result = left.leftOuterJoin(right, "a = d").select("a, b, e");
-{% endhighlight %}
-      </td>
-    </tr>
 
-    <tr>
-      <td>
-        <strong>Right Outer Join</strong><br>
-        <span class="label label-primary">Batch</span>
-      </td>
-      <td>
-        <p>Similar to a SQL RIGHT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
-{% highlight java %}
-Table left = tableEnv.fromDataSet(ds1, "a, b, c");
-Table right = tableEnv.fromDataSet(ds2, "d, e, f");
-Table result = left.rightOuterJoin(right, "a = d").select("a, b, e");
+Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");
+Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");
+Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
 {% endhighlight %}
       </td>
     </tr>
-
     <tr>
-      <td>
-        <strong>Full Outer Join</strong><br>
+      <td><strong>Time-windowed Join</strong><br>
         <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
+        <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
+
+        <p>A time-windowed join requires an equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code> &lt;, &lt;=, &gt;=, &gt;</code>) that compare the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+          <ul>
+            <li>The time attribute of a stream must be compared to a bounded interval on a time attribute of the opposite stream.</li>
+            <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
+          </ul>
+        </p>
+
+        <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
+
 {% highlight java %}
-Table left = tableEnv.fromDataSet(ds1, "a, b, c");
-Table right = tableEnv.fromDataSet(ds2, "d, e, f");
-Table result = left.fullOuterJoin(right, "a = d").select("a, b, e");
+Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
+Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");
+
+Table result = left.join(right)
+  .where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes")
+  .select("a, b, e, ltime");
 {% endhighlight %}
       </td>
     </tr>
@@ -609,7 +610,7 @@ Table result = orders
         <span class="label label-primary">Batch</span>
       </td>
       <td>
-        <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and an equality join predicate must be defined using a where or filter operator.</p>
+        <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.</p>
 {% highlight scala %}
 val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
 val right = ds2.toTable(tableEnv, 'd, 'e, 'f);
@@ -617,48 +618,47 @@ val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
 {% endhighlight %}
       </td>
     </tr>
-
     <tr>
       <td>
-        <strong>Left Outer Join</strong><br>
+        <strong>Outer Joins</strong><br>
         <span class="label label-primary">Batch</span>
       </td>
       <td>
-        <p>Similar to a SQL LEFT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
+        <p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
 {% highlight scala %}
 val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
 val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
-val result = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
-{% endhighlight %}
-      </td>
-    </tr>
 
-    <tr>
-      <td>
-        <strong>Right Outer Join</strong><br>
-        <span class="label label-primary">Batch</span>
-      </td>
-      <td>
-        <p>Similar to a SQL RIGHT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
-{% highlight scala %}
-val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
-val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
-val result = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
+val leftOuterResult = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
+val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
+val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
 {% endhighlight %}
       </td>
     </tr>
-
     <tr>
-      <td>
-        <strong>Full Outer Join</strong><br>
+      <td><strong>Time-windowed Join</strong><br>
         <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
+        <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
+
+        <p>A time-windowed join requires an equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code> &lt;, &lt;=, &gt;=, &gt;</code>) that compare the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+          <ul>
+            <li>The time attribute of a stream must be compared to a bounded interval on a time attribute of the opposite stream.</li>
+            <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
+          </ul>
+        </p>
+
+        <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
+
 {% highlight scala %}
-val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
-val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
-val result = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime);
+val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime);
+
+val result = left.join(right)
+  .where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 10.minutes)
+  .select('a, 'b, 'e, 'ltime);
 {% endhighlight %}
       </td>
     </tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 0c8efd7..ab72d47 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -429,11 +429,6 @@ case class Join(
     left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet)
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
-    if (tableEnv.isInstanceOf[StreamTableEnvironment]
-      && !right.isInstanceOf[LogicalTableFunctionCall]) {
-      failValidation(s"Join on stream tables is currently not supported.")
-    }
-
     val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
     if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
       failValidation(s"Filter operator requires a boolean expression as input, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
new file mode 100644
index 0000000..07e879f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+  * Currently only time-windowed inner joins can be processed in a streaming fashion.
+  */
+class JoinTest extends TableTestBase {
+
+  @Test
+  def testRowTimeWindowInnerJoin(): Unit = {
+    val util = streamTestUtil()
+    val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.rowtime)
+    val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime)
+
+    val resultTable = left.join(right)
+      .where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 3.seconds)
+      .select('a, 'e, 'ltime)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "ltime")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "d", "e", "rtime")
+          ),
+          term("where", "AND(=(a, d), >=(ltime, -(rtime, 300000))," +
+            " <(ltime, DATETIME_PLUS(rtime, 3000)))"),
+          term("join", "a", "ltime", "d", "e", "rtime"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "e", "ltime")
+      )
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testProcTimeWindowInnerJoin(): Unit = {
+    val util = streamTestUtil()
+    val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.proctime)
+    val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.proctime)
+
+    val resultTable = left.join(right)
+      .where('a === 'd && 'ltime >= 'rtime - 1.second && 'ltime < 'rtime)
+      .select('a, 'e, 'ltime)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "ltime")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "d", "e", "rtime")
+          ),
+          term("where", "AND(=(a, d), >=(ltime, -(rtime, 1000)), <(ltime, rtime))"),
+          term("join", "a", "ltime", "d", "e", "rtime"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "e", "PROCTIME(ltime) AS ltime")
+      )
+    util.verifyTable(resultTable, expected)
+  }
+
+  /**
+    * The time indicator can be accessed from non-time predicates now.
+    */
+  @Test
+  def testInnerJoinWithTimeIndicatorAccessed(): Unit = {
+    val util = streamTestUtil()
+    val left = util.addTable[(Long, Int, Timestamp)]('a, 'b, 'c, 'ltime.rowtime)
+    val right = util.addTable[(Long, Int, Timestamp)]('d, 'e, 'f, 'rtime.rowtime)
+
+    val resultTable = left.join(right)
+      .where('a ==='d && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime && 'ltime > 'f)
+
+    val expected =
+      binaryNode(
+        "DataStreamWindowJoin",
+        streamTableNode(0),
+        streamTableNode(1),
+        term("where", "AND(=(a, d), >=(ltime, -(rtime, 300000)), <(ltime, rtime), >(ltime, f))"),
+        term("join", "a", "b", "c", "ltime", "d", "e", "f", "rtime"),
+        term("joinType", "InnerJoin")
+      )
+    util.verifyTable(resultTable, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala
new file mode 100644
index 0000000..e924e6e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+/**
+  * Currently only time-windowed inner joins can be processed in a streaming fashion.
+  */
+class JoinValidationTest extends TableTestBase {
+
+  /**
+    * At least one equi-join predicate required.
+    */
+  @Test(expected = classOf[TableException])
+  def testInnerJoinWithoutEquiPredicate(): Unit = {
+    val util = streamTestUtil()
+    val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.rowtime)
+    val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime)
+
+    val resultTable = left.join(right)
+      .where('ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 3.seconds)
+      .select('a, 'e, 'ltime)
+
+    val expected = ""
+    util.verifyTable(resultTable, expected)
+  }
+
+  /**
+    * There must be complete window-bounds.
+    */
+  @Test(expected = classOf[TableException])
+  def testInnerJoinWithIncompleteWindowBounds1(): Unit = {
+    val util = streamTestUtil()
+    val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.rowtime)
+    val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime)
+
+    val resultTable = left.join(right)
+      .where('a ==='d && 'ltime >= 'rtime - 5.minutes && 'ltime < 'ltime + 3.seconds)
+      .select('a, 'e, 'ltime)
+
+    util.verifyTable(resultTable, "")
+  }
+
+  /**
+    * There must be complete window-bounds.
+    */
+  @Test(expected = classOf[TableException])
+  def testInnerJoinWithIncompleteWindowBounds2(): Unit = {
+    val util = streamTestUtil()
+    val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.rowtime)
+    val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime)
+
+    val resultTable = left.join(right)
+      .where('a ==='d && 'ltime >= 'rtime - 5.minutes && 'ltime > 'rtime + 3.seconds)
+      .select('a, 'e, 'ltime)
+
+    util.verifyTable(resultTable, "")
+  }
+
+  /**
+    * Time indicators for the two tables must be identical.
+    */
+  @Test(expected = classOf[TableException])
+  def testInnerJoinWithDifferentTimeIndicators(): Unit = {
+    val util = streamTestUtil()
+    val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.proctime)
+    val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime)
+
+    val resultTable = left.join(right)
+      .where('a ==='d && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 3.seconds)
+
+    util.verifyTable(resultTable, "")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index 015a5a2..119f92f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.runtime.stream.sql
 
+import java.util
+
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
@@ -27,7 +29,6 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
-import org.hamcrest.CoreMatchers
 import org.junit._
 
 import scala.collection.mutable
@@ -238,22 +239,67 @@ class JoinITCase extends StreamingWithStateTestBase {
     env.execute()
 
     // There may be two expected results according to the process order.
-    val expected1 = new mutable.MutableList[String]
-    expected1+= "1,LEFT3,RIGHT6"
-    expected1+= "1,LEFT1.1,RIGHT6"
-    expected1+= "2,LEFT4,RIGHT7"
-    expected1+= "1,LEFT4.9,RIGHT6"
-
-    val expected2 = new mutable.MutableList[String]
-    expected2+= "1,LEFT3,RIGHT6"
-    expected2+= "1,LEFT1.1,RIGHT6"
-    expected2+= "2,LEFT4,RIGHT7"
-    expected2+= "1,LEFT4.9,RIGHT6"
-
-    Assert.assertThat(
-      StreamITCase.testResults.sorted,
-      CoreMatchers.either(CoreMatchers.is(expected1.sorted)).
-        or(CoreMatchers.is(expected2.sorted)))
+    val expected = new util.ArrayList[String]
+    expected.add("1,LEFT3,RIGHT6")
+    expected.add("1,LEFT1.1,RIGHT6")
+    expected.add("2,LEFT4,RIGHT7")
+    expected.add("1,LEFT4.9,RIGHT6")
+    StreamITCase.compareWithList(expected)
+  }
+
+  /** test rowtime inner join with another time condition **/
+  @Test
+  def testRowTimeInnerJoinWithOtherTimeCondition(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    StreamITCase.clear
+
+    val sqlQuery =
+      """
+        |SELECT t2.a, t1.c, t2.c
+        |FROM T1 as t1 JOIN T2 as t2 ON
+        |  t1.a = t2.a AND
+        |  t1.rt > t2.rt - INTERVAL '4' SECOND AND
+        |    t1.rt < t2.rt AND
+        |  QUARTER(t1.rt) = t2.a
+        |""".stripMargin
+
+    val data1 = new mutable.MutableList[(Int, Long, String, Long)]
+    data1.+=((1, 4L, "LEFT1", 1000L))
+    data1.+=((1, 2L, "LEFT2", 2000L))
+    data1.+=((1, 7L, "LEFT3", 3000L))
+    data1.+=((2, 5L, "LEFT4", 4000L))
+    data1.+=((1, 4L, "LEFT5", 5000L))
+    data1.+=((1, 10L, "LEFT6", 6000L))
+
+    val data2 = new mutable.MutableList[(Int, Long, String, Long)]
+    data2.+=((1, 1L, "RIGHT1", 1000L))
+    data2.+=((1, 9L, "RIGHT6", 6000L))
+    data2.+=((2, 8, "RIGHT7", 7000L))
+    data2.+=((1, 4L, "RIGHT8", 8000L))
+
+    val t1 = env.fromCollection(data1)
+      .assignTimestampsAndWatermarks(new Row4WatermarkExtractor)
+      .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime)
+    val t2 = env.fromCollection(data2)
+      .assignTimestampsAndWatermarks(new Row4WatermarkExtractor)
+      .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = new java.util.ArrayList[String]
+    expected.add("1,LEFT3,RIGHT6")
+    expected.add("1,LEFT5,RIGHT6")
+    expected.add("1,LEFT5,RIGHT8")
+    expected.add("1,LEFT6,RIGHT8")
+    StreamITCase.compareWithList(expected)
   }
 
   /** test rowtime inner join with window aggregation **/