You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/08 17:41:59 UTC
[2/5] flink git commit: [FLINK-7996] [table] Add support for
(left.time = right.time) predicates to window join.
[FLINK-7996] [table] Add support for (left.time = right.time) predicates to window join.
This closes #4977.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ad18309
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ad18309
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ad18309
Branch: refs/heads/master
Commit: 1ad18309b974541d209f40c395c9f37bd907f32a
Parents: 949ae79
Author: Xingcan Cui <xi...@gmail.com>
Authored: Wed Nov 8 01:17:57 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 17:12:18 2017 +0100
----------------------------------------------------------------------
docs/dev/table/sql.md | 17 ++--
docs/dev/table/tableApi.md | 26 +++---
.../table/runtime/join/WindowJoinUtil.scala | 95 ++++++++++++--------
.../flink/table/api/stream/sql/JoinTest.scala | 66 ++++++++++++++
.../sql/validation/JoinValidationTest.scala | 29 ++++++
.../flink/table/api/stream/table/JoinTest.scala | 34 +++++++
.../table/runtime/stream/sql/JoinITCase.scala | 47 ++++++++++
7 files changed, 259 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 2318271..3097d9e 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -400,14 +400,15 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
<td>
<p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
- <p>A time-windowed join requires at least one equi-join predicate and a special join
- condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
- <ul>
- <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
- <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
- </ul>
- </p>
-
+ <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>), a <code>BETWEEN</code> predicate, or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p>
+ <p>For example, the following predicates are valid window join conditions:</p>
+
+ <ul>
+ <li><code>ltime = rtime</code></li>
+ <li><code>ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE</code></li>
+ <li><code>ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND</code></li>
+ </ul>
+
<p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
{% highlight sql %}
http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 7cce042..f5a2059 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -527,13 +527,14 @@ Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
<td>
<p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
- <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
- <ul>
- <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
- <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
- </ul>
- </p>
+ <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p>
+ <p>For example, the following predicates are valid window join conditions:</p>
+ <ul>
+ <li><code>ltime === rtime</code></li>
+ <li><code>ltime >= rtime && ltime < rtime + 10.minutes</code></li>
+ </ul>
+
<p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
{% highlight java %}
@@ -644,13 +645,14 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
<td>
<p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
- <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
- <ul>
- <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
- <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
- </ul>
- </p>
+ <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p>
+ <p>For example, the following predicates are valid window join conditions:</p>
+ <ul>
+ <li><code>'ltime === 'rtime</code></li>
+ <li><code>'ltime >= 'rtime && 'ltime < 'rtime + 10.minutes</code></li>
+ </ul>
+
<p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
{% highlight scala %}
http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index 863f342..1693c41 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -78,29 +78,45 @@ object WindowJoinUtil {
// Converts the condition to conjunctive normal form (CNF)
val cnfCondition = RexUtil.toCnf(rexBuilder, predicate)
- // split the condition into time indicator condition and other condition
+ // split the condition into time predicates and other predicates
+ // We need two range predicates or an equality predicate for a properly bounded window join.
val (timePreds, otherPreds) = cnfCondition match {
- // We need at least two comparison predicates for a properly bounded window join.
- // So we need an AND expression for a valid window join.
- case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
- c.getOperands.asScala
- .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema))
- .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => {
- analyzed match {
- case Left(timePred) => (preds._1 :+ timePred, preds._2)
- case Right(otherPred) => (preds._1, preds._2 :+ otherPred)
- }
- })
- case _ =>
- // No valid window bounds. A windowed stream join requires two comparison predicates that
- // bound the time in both directions.
- return (None, Some(predicate))
+ case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+ // extract all time predicates from conjunctive predicate
+ c.getOperands.asScala
+ .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema))
+ .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => {
+ analyzed match {
+ case Left(timePred) => (preds._1 :+ timePred, preds._2)
+ case Right(otherPred) => (preds._1, preds._2 :+ otherPred)
+ }
+ })
+ case c: RexCall =>
+ // extract time predicate if it exists
+ identifyTimePredicate(c, leftLogicalFieldCnt, inputSchema) match {
+ case Left(timePred) => (Seq[TimePredicate](timePred), Seq[RexNode]())
+ case Right(otherPred) => (Seq[TimePredicate](), Seq[RexNode](otherPred))
+ }
+ case _ =>
+ // No valid window bounds.
+ return (None, Some(predicate))
}
- if (timePreds.size != 2) {
- // No valid window bounds. A windowed stream join requires two comparison predicates that
- // bound the time in both directions.
- return (None, Some(predicate))
+ timePreds match {
+ case Seq() =>
+ return (None, Some(predicate))
+ case Seq(t) if t.pred.getKind != SqlKind.EQUALS =>
+ // single predicate must be equality predicate
+ return (None, Some(predicate))
+ case s@Seq(_, _) if s.exists(_.pred.getKind == SqlKind.EQUALS) =>
+ // pair of range predicate must not include equals predicate
+ return (None, Some(predicate))
+ case Seq(_) =>
+ // Single equality predicate is OK
+ case Seq(_, _) =>
+ // Two range (i.e., non-equality predicates are OK
+ case _ =>
+ return (None, Some(predicate))
}
// assemble window bounds from predicates
@@ -108,9 +124,14 @@ object WindowJoinUtil {
val (leftLowerBound, leftUpperBound) =
streamTimeOffsets match {
case Seq(Some(x: WindowBound), Some(y: WindowBound)) if x.isLeftLower && !y.isLeftLower =>
+ // two range predicates
(x.bound, y.bound)
case Seq(Some(x: WindowBound), Some(y: WindowBound)) if !x.isLeftLower && y.isLeftLower =>
+ // two range predicates
(y.bound, x.bound)
+ case Seq(Some(x: WindowBound)) =>
+ // single equality predicate
+ (x.bound, x.bound)
case _ =>
// Window join requires two comparison predicate that bound the time in both directions.
return (None, Some(predicate))
@@ -118,12 +139,12 @@ object WindowJoinUtil {
// compose the remain condition list into one condition
val remainCondition =
- otherPreds match {
- case Seq() =>
- None
- case _ =>
- Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r)))
- }
+ otherPreds match {
+ case Seq() =>
+ None
+ case _ =>
+ Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r)))
+ }
val bounds = if (timePreds.head.leftInputOnLeftSide) {
Some(WindowBounds(
@@ -146,14 +167,15 @@ object WindowJoinUtil {
/**
* Analyzes a predicate and identifies whether it is a valid predicate for a window join.
- * A valid window join predicate is a comparison predicate (<, <=, =>, >) that accesses
- * time attributes of both inputs, each input on a different side of the condition.
+ *
+ * A valid window join predicate is a range or equality predicate (<, <=, ==, =>, >) that
+ * accesses time attributes of both inputs, each input on a different side of the condition.
* Both accessed time attributes must be of the same time type, i.e., row-time or proc-time.
*
* Examples:
* - left.rowtime > right.rowtime + 2.minutes => valid
+ * - left.rowtime == right.rowtime => valid
* - left.proctime < right.rowtime + 2.minutes => invalid: different time type
- * - left.rowtime == right.rowtime + 2.minutes => invalid: not a comparison predicate
* - left.rowtime - right.rowtime < 2.minutes => invalid: both time attributes on same side
*
* If the predicate is a regular join predicate, i.e., it accesses no time attribute it is
@@ -172,7 +194,8 @@ object WindowJoinUtil {
case SqlKind.GREATER_THAN |
SqlKind.GREATER_THAN_OR_EQUAL |
SqlKind.LESS_THAN |
- SqlKind.LESS_THAN_OR_EQUAL =>
+ SqlKind.LESS_THAN_OR_EQUAL |
+ SqlKind.EQUALS =>
val leftTerm = c.getOperands.get(0)
val rightTerm = c.getOperands.get(1)
@@ -235,7 +258,7 @@ object WindowJoinUtil {
*
* @return A Seq of all time attribute accessed in the expression.
*/
- def extractTimeAttributeAccesses(
+ private def extractTimeAttributeAccesses(
expr: RexNode,
leftFieldCount: Int,
inputType: RelDataType): Seq[TimeAttributeAccess] = {
@@ -248,9 +271,9 @@ object WindowJoinUtil {
case t: TimeIndicatorRelDataType =>
// time attribute access. Remember time type and side of input
if (idx < leftFieldCount) {
- Seq(TimeAttributeAccess(t.isEventTime, true, idx))
+ Seq(TimeAttributeAccess(t.isEventTime, isLeftInput = true, idx))
} else {
- Seq(TimeAttributeAccess(t.isEventTime, false, idx - leftFieldCount))
+ Seq(TimeAttributeAccess(t.isEventTime, isLeftInput = false, idx - leftFieldCount))
}
case _ =>
// not a time attribute access.
@@ -272,7 +295,7 @@ object WindowJoinUtil {
* @param inputType The input type of the expression.
* @return True, if the expression accesses a non-time attribute. False otherwise.
*/
- def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
+ private def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
expr match {
case i: RexInputRef =>
val accessedType = inputType.getFieldList.get(i.getIndex).getType
@@ -292,7 +315,7 @@ object WindowJoinUtil {
*
* @return window boundary, is left lower bound
*/
- def computeWindowBoundFromPredicate(
+ private def computeWindowBoundFromPredicate(
timePred: TimePredicate,
rexBuilder: RexBuilder,
config: TableConfig): Option[WindowBound] = {
@@ -303,6 +326,8 @@ object WindowJoinUtil {
timePred.leftInputOnLeftSide
case (SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
!timePred.leftInputOnLeftSide
+ case (SqlKind.EQUALS) =>
+ true // We don't care about this since there's only one bound value.
case _ =>
return None
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 53aff82..ded5c51 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -185,6 +185,66 @@ class JoinTest extends TableTestBase {
}
@Test
+ def testJoinWithEquiProcTime(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1, MyTable2 t2
+ |WHERE t1.a = t2.a AND
+ | t1.proctime = t2.proctime
+ |""".stripMargin
+
+ val expected =
+ unaryNode("DataStreamCalc",
+ binaryNode("DataStreamWindowJoin",
+ unaryNode("DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "proctime")
+ ),
+ unaryNode("DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "proctime")
+ ),
+ term("where", "AND(=(a, a0), =(proctime, proctime0))"),
+ term("join", "a", "proctime", "a0", "b", "proctime0"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b0 AS b")
+ )
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testJoinWithEquiRowTime(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1, MyTable2 t2
+ |WHERE t1.a = t2.a AND
+ | t1.c = t2.c
+ |""".stripMargin
+
+ val expected =
+ unaryNode("DataStreamCalc",
+ binaryNode("DataStreamWindowJoin",
+ unaryNode("DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c")
+ ),
+ unaryNode("DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "c")
+ ),
+ term("where", "AND(=(a, a0), =(c, c0))"),
+ term("join", "a", "c", "a0", "b", "c0"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b0 AS b")
+ )
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = {
val sqlQuery =
@@ -332,6 +392,12 @@ class JoinTest extends TableTestBase {
-10000,
-5000,
"rowtime")
+
+ verifyTimeBoundary(
+ "t1.c = t2.c",
+ 0,
+ 0,
+ "rowtime")
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
index 9cce37e..9f7078c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
@@ -92,4 +92,33 @@ class JoinValidationTest extends TableTestBase {
streamUtil.verifySql(sql, "n/a")
}
+ /** Validates that range and equality predicate are not accepted **/
+ @Test(expected = classOf[TableException])
+ def testRangeAndEqualityPredicates(): Unit = {
+ val sql =
+ """
+ |SELECT *
+ |FROM MyTable t1, MyTable2 t2
+ |WHERE t1.a = t2.a AND
+ | t1.proctime > t2.proctime - INTERVAL '5' SECOND AND
+ | t1.proctime = t2.proctime
+ | """.stripMargin
+
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ /** Validates that equality predicate with offset are not accepted **/
+ @Test(expected = classOf[TableException])
+ def testEqualityPredicateWithOffset(): Unit = {
+ val sql =
+ """
+ |SELECT *
+ |FROM MyTable t1, MyTable2 t2
+ |WHERE t1.a = t2.a AND
+ | t1.proctime = t2.proctime - INTERVAL '5' SECOND
+ | """.stripMargin
+
+ streamUtil.verifySql(sql, "n/a")
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
index 07e879f..79b413c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
@@ -100,6 +100,40 @@ class JoinTest extends TableTestBase {
util.verifyTable(resultTable, expected)
}
+ @Test
+ def testProcTimeWindowInnerJoinWithEquiTimeAttrs(): Unit = {
+ val util = streamTestUtil()
+ val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.proctime)
+ val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.proctime)
+
+ val resultTable = left.join(right)
+ .where('a === 'd && 'ltime === 'rtime)
+ .select('a, 'e, 'ltime)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "ltime")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "d", "e", "rtime")
+ ),
+ term("where", "AND(=(a, d), =(ltime, rtime))"),
+ term("join", "a", "ltime", "d", "e", "rtime"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "e", "PROCTIME(ltime) AS ltime")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
/**
* The time indicator can be accessed from non-time predicates now.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index 1d7bab6..85929e8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -187,6 +187,53 @@ class JoinITCase extends StreamingWithStateTestBase {
StreamITCase.compareWithList(expected)
}
+ /** test rowtime inner join with equi-times **/
+ @Test
+ def testRowTimeInnerJoinWithEquiTimeAttrs(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ StreamITCase.clear
+
+ val sqlQuery =
+ """
+ |SELECT t2.key, t2.id, t1.id
+ |FROM T1 as t1 join T2 as t2 ON
+ | t1.key = t2.key AND
+ | t2.rt = t1.rt
+ |""".stripMargin
+
+ val data1 = new mutable.MutableList[(Int, Long, String, Long)]
+
+ data1.+=((4, 4000L, "A", 4000L))
+ data1.+=((5, 5000L, "A", 5000L))
+ data1.+=((6, 6000L, "A", 6000L))
+ data1.+=((6, 6000L, "B", 6000L))
+
+ val data2 = new mutable.MutableList[(String, String, Long)]
+ data2.+=(("A", "R-5", 5000L))
+ data2.+=(("B", "R-6", 6000L))
+
+ val t1 = env.fromCollection(data1)
+ .assignTimestampsAndWatermarks(new Row4WatermarkExtractor)
+ .toTable(tEnv, 'id, 'tm, 'key, 'rt.rowtime)
+ val t2 = env.fromCollection(data2)
+ .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+ .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ val expected = new java.util.ArrayList[String]
+ expected.add("A,R-5,5")
+ expected.add("B,R-6,6")
+ StreamITCase.compareWithList(expected)
+ }
+
/** test rowtime inner join with other conditions **/
@Test
def testRowTimeInnerJoinWithOtherConditions(): Unit = {