You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/08 17:41:59 UTC

[2/5] flink git commit: [FLINK-7996] [table] Add support for (left.time = right.time) predicates to window join.

[FLINK-7996] [table] Add support for (left.time = right.time) predicates to window join.

This closes #4977.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ad18309
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ad18309
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ad18309

Branch: refs/heads/master
Commit: 1ad18309b974541d209f40c395c9f37bd907f32a
Parents: 949ae79
Author: Xingcan Cui <xi...@gmail.com>
Authored: Wed Nov 8 01:17:57 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 17:12:18 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sql.md                           | 17 ++--
 docs/dev/table/tableApi.md                      | 26 +++---
 .../table/runtime/join/WindowJoinUtil.scala     | 95 ++++++++++++--------
 .../flink/table/api/stream/sql/JoinTest.scala   | 66 ++++++++++++++
 .../sql/validation/JoinValidationTest.scala     | 29 ++++++
 .../flink/table/api/stream/table/JoinTest.scala | 34 +++++++
 .../table/runtime/stream/sql/JoinITCase.scala   | 47 ++++++++++
 7 files changed, 259 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 2318271..3097d9e 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -400,14 +400,15 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
       <td>
         <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
 
-        <p>A time-windowed join requires at least one equi-join predicate and a special join 
-        condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
-          <ul>
-            <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
-            <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
-          </ul>
-        </p>
-
+        <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>), a <code>BETWEEN</code> predicate, or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> 
+        <p>For example, the following predicates are valid window join conditions:</p>
+          
+        <ul>
+          <li><code>ltime = rtime</code></li>
+          <li><code>ltime &gt;= rtime AND ltime &lt; rtime + INTERVAL '10' MINUTE</code></li>
+          <li><code>ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND</code></li>
+        </ul>
+                
         <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
 
 {% highlight sql %}

http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 7cce042..f5a2059 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -527,13 +527,14 @@ Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
       <td>
         <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
 
-        <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
-          <ul>
-            <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
-            <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
-          </ul>
-        </p>
+        <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> 
+        <p>For example, the following predicates are valid window join conditions:</p>
 
+        <ul>
+          <li><code>ltime === rtime</code></li>
+          <li><code>ltime &gt;= rtime &amp;&amp; ltime &lt; rtime + 10.minutes</code></li>
+        </ul>
+        
         <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
 
 {% highlight java %}
@@ -644,13 +645,14 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
       <td>
         <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
 
-        <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
-          <ul>
-            <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
-            <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
-          </ul>
-        </p>
+        <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> 
+        <p>For example, the following predicates are valid window join conditions:</p>
 
+        <ul>
+          <li><code>'ltime === 'rtime</code></li>
+          <li><code>'ltime &gt;= 'rtime &amp;&amp; 'ltime &lt; 'rtime + 10.minutes</code></li>
+        </ul>
+        
         <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
 
 {% highlight scala %}

http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index 863f342..1693c41 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -78,29 +78,45 @@ object WindowJoinUtil {
     // Converts the condition to conjunctive normal form (CNF)
     val cnfCondition = RexUtil.toCnf(rexBuilder, predicate)
 
-    // split the condition into time indicator condition and other condition
+    // split the condition into time predicates and other predicates
+    // We need two range predicates or an equality predicate for a properly bounded window join.
     val (timePreds, otherPreds) = cnfCondition match {
-        // We need at least two comparison predicates for a properly bounded window join.
-        // So we need an AND expression for a valid window join.
-        case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
-          c.getOperands.asScala
-            .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema))
-            .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => {
-              analyzed match {
-                case Left(timePred) => (preds._1 :+ timePred, preds._2)
-                case Right(otherPred) => (preds._1, preds._2 :+ otherPred)
-              }
-            })
-        case _ =>
-          // No valid window bounds. A windowed stream join requires two comparison predicates that
-          // bound the time in both directions.
-          return (None, Some(predicate))
+      case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+        // extract all time predicates from conjunctive predicate
+        c.getOperands.asScala
+          .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema))
+          .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => {
+            analyzed match {
+              case Left(timePred) => (preds._1 :+ timePred, preds._2)
+              case Right(otherPred) => (preds._1, preds._2 :+ otherPred)
+            }
+          })
+      case c: RexCall =>
+        // extract time predicate if it exists
+        identifyTimePredicate(c, leftLogicalFieldCnt, inputSchema) match {
+          case Left(timePred) => (Seq[TimePredicate](timePred), Seq[RexNode]())
+          case Right(otherPred) => (Seq[TimePredicate](), Seq[RexNode](otherPred))
+        }
+      case _ =>
+        // No valid window bounds.
+        return (None, Some(predicate))
     }
 
-    if (timePreds.size != 2) {
-      // No valid window bounds. A windowed stream join requires two comparison predicates that
-      // bound the time in both directions.
-      return (None, Some(predicate))
+    timePreds match {
+      case Seq() =>
+        return (None, Some(predicate))
+      case Seq(t) if t.pred.getKind != SqlKind.EQUALS =>
+        // single predicate must be equality predicate
+        return (None, Some(predicate))
+      case s@Seq(_, _) if s.exists(_.pred.getKind == SqlKind.EQUALS) =>
+        // pair of range predicate must not include equals predicate
+        return (None, Some(predicate))
+      case Seq(_) =>
+        // Single equality predicate is OK
+      case Seq(_, _) =>
+        // Two range (i.e., non-equality predicates are OK
+      case _ =>
+        return (None, Some(predicate))
     }
 
     // assemble window bounds from predicates
@@ -108,9 +124,14 @@ object WindowJoinUtil {
     val (leftLowerBound, leftUpperBound) =
       streamTimeOffsets match {
         case Seq(Some(x: WindowBound), Some(y: WindowBound)) if x.isLeftLower && !y.isLeftLower =>
+          // two range predicates
           (x.bound, y.bound)
         case Seq(Some(x: WindowBound), Some(y: WindowBound)) if !x.isLeftLower && y.isLeftLower =>
+          // two range predicates
           (y.bound, x.bound)
+        case Seq(Some(x: WindowBound)) =>
+          // single equality predicate
+          (x.bound, x.bound)
         case _ =>
           // Window join requires two comparison predicate that bound the time in both directions.
           return (None, Some(predicate))
@@ -118,12 +139,12 @@ object WindowJoinUtil {
 
     // compose the remain condition list into one condition
     val remainCondition =
-    otherPreds match {
-      case Seq() =>
-        None
-      case _ =>
-        Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r)))
-    }
+      otherPreds match {
+        case Seq() =>
+          None
+        case _ =>
+          Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r)))
+      }
 
     val bounds = if (timePreds.head.leftInputOnLeftSide) {
       Some(WindowBounds(
@@ -146,14 +167,15 @@ object WindowJoinUtil {
 
   /**
     * Analyzes a predicate and identifies whether it is a valid predicate for a window join.
-    * A valid window join predicate is a comparison predicate (<, <=, =>, >) that accesses
-    * time attributes of both inputs, each input on a different side of the condition.
+    *
+    * A valid window join predicate is a range or equality predicate (<, <=, ==, =>, >) that
+    * accesses time attributes of both inputs, each input on a different side of the condition.
     * Both accessed time attributes must be of the same time type, i.e., row-time or proc-time.
     *
     * Examples:
     * - left.rowtime > right.rowtime + 2.minutes => valid
+    * - left.rowtime == right.rowtime => valid
     * - left.proctime < right.rowtime + 2.minutes => invalid: different time type
-    * - left.rowtime == right.rowtime + 2.minutes => invalid: not a comparison predicate
     * - left.rowtime - right.rowtime < 2.minutes => invalid: both time attributes on same side
     *
     * If the predicate is a regular join predicate, i.e., it accesses no time attribute it is
@@ -172,7 +194,8 @@ object WindowJoinUtil {
           case SqlKind.GREATER_THAN |
                SqlKind.GREATER_THAN_OR_EQUAL |
                SqlKind.LESS_THAN |
-               SqlKind.LESS_THAN_OR_EQUAL =>
+               SqlKind.LESS_THAN_OR_EQUAL |
+               SqlKind.EQUALS =>
 
             val leftTerm = c.getOperands.get(0)
             val rightTerm = c.getOperands.get(1)
@@ -235,7 +258,7 @@ object WindowJoinUtil {
     *
     * @return A Seq of all time attribute accessed in the expression.
     */
-  def extractTimeAttributeAccesses(
+  private def extractTimeAttributeAccesses(
       expr: RexNode,
       leftFieldCount: Int,
       inputType: RelDataType): Seq[TimeAttributeAccess] = {
@@ -248,9 +271,9 @@ object WindowJoinUtil {
           case t: TimeIndicatorRelDataType =>
             // time attribute access. Remember time type and side of input
             if (idx < leftFieldCount) {
-              Seq(TimeAttributeAccess(t.isEventTime, true, idx))
+              Seq(TimeAttributeAccess(t.isEventTime, isLeftInput = true, idx))
             } else {
-              Seq(TimeAttributeAccess(t.isEventTime, false, idx - leftFieldCount))
+              Seq(TimeAttributeAccess(t.isEventTime, isLeftInput = false, idx - leftFieldCount))
             }
           case _ =>
             // not a time attribute access.
@@ -272,7 +295,7 @@ object WindowJoinUtil {
     * @param inputType The input type of the expression.
     * @return True, if the expression accesses a non-time attribute. False otherwise.
     */
-  def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
+  private def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
     expr match {
       case i: RexInputRef =>
         val accessedType = inputType.getFieldList.get(i.getIndex).getType
@@ -292,7 +315,7 @@ object WindowJoinUtil {
     *
     * @return window boundary, is left lower bound
     */
-  def computeWindowBoundFromPredicate(
+  private def computeWindowBoundFromPredicate(
       timePred: TimePredicate,
       rexBuilder: RexBuilder,
       config: TableConfig): Option[WindowBound] = {
@@ -303,6 +326,8 @@ object WindowJoinUtil {
           timePred.leftInputOnLeftSide
         case (SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
           !timePred.leftInputOnLeftSide
+        case (SqlKind.EQUALS) =>
+          true // We don't care about this since there's only one bound value.
         case _ =>
           return None
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 53aff82..ded5c51 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -185,6 +185,66 @@ class JoinTest extends TableTestBase {
   }
 
   @Test
+  def testJoinWithEquiProcTime(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.proctime = t2.proctime
+        |""".stripMargin
+
+    val expected =
+      unaryNode("DataStreamCalc",
+        binaryNode("DataStreamWindowJoin",
+          unaryNode("DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "proctime")
+          ),
+          unaryNode("DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "proctime")
+          ),
+          term("where", "AND(=(a, a0), =(proctime, proctime0))"),
+          term("join", "a", "proctime", "a0", "b", "proctime0"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b0 AS b")
+      )
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testJoinWithEquiRowTime(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.c = t2.c
+        |""".stripMargin
+
+    val expected =
+      unaryNode("DataStreamCalc",
+        binaryNode("DataStreamWindowJoin",
+          unaryNode("DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c")
+          ),
+          unaryNode("DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "c")
+          ),
+          term("where", "AND(=(a, a0), =(c, c0))"),
+          term("join", "a", "c", "a0", "b", "c0"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b0 AS b")
+      )
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
   def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = {
 
     val sqlQuery =
@@ -332,6 +392,12 @@ class JoinTest extends TableTestBase {
       -10000,
       -5000,
       "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c = t2.c",
+      0,
+      0,
+      "rowtime")
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
index 9cce37e..9f7078c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
@@ -92,4 +92,33 @@ class JoinValidationTest extends TableTestBase {
     streamUtil.verifySql(sql, "n/a")
   }
 
+  /** Validates that range and equality predicate are not accepted **/
+  @Test(expected = classOf[TableException])
+  def testRangeAndEqualityPredicates(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.proctime > t2.proctime - INTERVAL '5' SECOND AND
+        |  t1.proctime = t2.proctime
+        | """.stripMargin
+
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** Validates that equality predicate with offset are not accepted **/
+  @Test(expected = classOf[TableException])
+  def testEqualityPredicateWithOffset(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.proctime = t2.proctime - INTERVAL '5' SECOND
+        | """.stripMargin
+
+    streamUtil.verifySql(sql, "n/a")
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
index 07e879f..79b413c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
@@ -100,6 +100,40 @@ class JoinTest extends TableTestBase {
     util.verifyTable(resultTable, expected)
   }
 
+  @Test
+  def testProcTimeWindowInnerJoinWithEquiTimeAttrs(): Unit = {
+    val util = streamTestUtil()
+    val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.proctime)
+    val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.proctime)
+
+    val resultTable = left.join(right)
+      .where('a === 'd && 'ltime === 'rtime)
+      .select('a, 'e, 'ltime)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "ltime")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "d", "e", "rtime")
+          ),
+          term("where", "AND(=(a, d), =(ltime, rtime))"),
+          term("join", "a", "ltime", "d", "e", "rtime"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "e", "PROCTIME(ltime) AS ltime")
+      )
+    util.verifyTable(resultTable, expected)
+  }
+
   /**
     * The time indicator can be accessed from non-time predicates now.
     */

http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index 1d7bab6..85929e8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -187,6 +187,53 @@ class JoinITCase extends StreamingWithStateTestBase {
     StreamITCase.compareWithList(expected)
   }
 
+  /** test rowtime inner join with equi-times **/
+  @Test
+  def testRowTimeInnerJoinWithEquiTimeAttrs(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    StreamITCase.clear
+
+    val sqlQuery =
+      """
+        |SELECT t2.key, t2.id, t1.id
+        |FROM T1 as t1 join T2 as t2 ON
+        |  t1.key = t2.key AND
+        |  t2.rt = t1.rt
+        |""".stripMargin
+
+    val data1 = new mutable.MutableList[(Int, Long, String, Long)]
+
+    data1.+=((4, 4000L, "A", 4000L))
+    data1.+=((5, 5000L, "A", 5000L))
+    data1.+=((6, 6000L, "A", 6000L))
+    data1.+=((6, 6000L, "B", 6000L))
+
+    val data2 = new mutable.MutableList[(String, String, Long)]
+    data2.+=(("A", "R-5", 5000L))
+    data2.+=(("B", "R-6", 6000L))
+
+    val t1 = env.fromCollection(data1)
+      .assignTimestampsAndWatermarks(new Row4WatermarkExtractor)
+      .toTable(tEnv, 'id, 'tm, 'key, 'rt.rowtime)
+    val t2 = env.fromCollection(data2)
+      .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+      .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+    val expected = new java.util.ArrayList[String]
+    expected.add("A,R-5,5")
+    expected.add("B,R-6,6")
+    StreamITCase.compareWithList(expected)
+  }
+
   /** test rowtime inner join with other conditions **/
   @Test
   def testRowTimeInnerJoinWithOtherConditions(): Unit = {