You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/06/05 13:37:26 UTC

flink git commit: [FLINK-8430] [table] Implement stream-stream non-window full outer join

Repository: flink
Updated Branches:
  refs/heads/master e455285b6 -> 6ce94c1f9


[FLINK-8430] [table] Implement stream-stream non-window full outer join

This closes #6079.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ce94c1f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ce94c1f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ce94c1f

Branch: refs/heads/master
Commit: 6ce94c1f97ce357d0d10a113b472c19fc4e88c73
Parents: e455285
Author: hequn8128 <ch...@gmail.com>
Authored: Sat May 26 10:29:51 2018 +0800
Committer: Timo Walther <tw...@apache.org>
Committed: Tue Jun 5 15:13:58 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |   2 +-
 docs/dev/table/tableApi.md                      |   2 +-
 .../plan/nodes/datastream/DataStreamJoin.scala  |  27 +-
 .../table/runtime/join/NonWindowFullJoin.scala  |  93 +++++
 ...NonWindowFullJoinWithNonEquiPredicates.scala | 119 +++++++
 .../table/runtime/join/NonWindowOuterJoin.scala |   3 +
 ...onWindowOuterJoinWithNonEquiPredicates.scala |   3 +
 .../flink/table/plan/RetractionRulesTest.scala  |  55 +++
 .../table/runtime/harness/JoinHarnessTest.scala | 342 +++++++++++++++++++
 .../table/runtime/stream/sql/JoinITCase.scala   |  26 ++
 .../table/runtime/stream/table/JoinITCase.scala |  85 ++++-
 11 files changed, 741 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6ce94c1f/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index f0ac45f..da78a40 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -398,7 +398,7 @@ FROM Orders INNER JOIN Product ON Orders.productId = Product.id
         <span class="label label-info">Result Updating</span>
       </td>
       <td>
-        <p>Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported. Full join is not supported in streaming yet.</p>
+        <p>Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.</p>
         <p><b>Note:</b> The order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. Make sure to specify tables in an order that does not yield a cross join (Cartesian product) which are not supported and would cause a query to fail.</p>
 {% highlight sql %}
 SELECT *

http://git-wip-us.apache.org/repos/asf/flink/blob/6ce94c1f/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index a7b9522..44436c4 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -512,7 +512,7 @@ Table result = left.join(right).where("a = d").select("a, b, e");
         <span class="label label-info">Result Updating</span>
       </td>
       <td>
-        <p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined. Full join is not supported in streaming yet.</p>
+        <p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
 {% highlight java %}
 Table left = tableEnv.fromDataSet(ds1, "a, b, c");
 Table right = tableEnv.fromDataSet(ds2, "d, e, f");

http://git-wip-us.apache.org/repos/asf/flink/blob/6ce94c1f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
index c5295a1..1e2311f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.codegen.FunctionCodeGenerator
 import org.apache.flink.table.plan.nodes.CommonJoin
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.CRowKeySelector
-import org.apache.flink.table.runtime.join.{NonWindowInnerJoin, NonWindowLeftRightJoin, NonWindowLeftRightJoinWithNonEquiPredicates}
+import org.apache.flink.table.runtime.join._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.types.Row
 
@@ -139,14 +139,7 @@ class DataStreamJoin(
     val rightDataStream =
       right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
 
-    val connectOperator = joinType match {
-      case JoinRelType.INNER | JoinRelType.LEFT | JoinRelType.RIGHT =>
-        leftDataStream.connect(rightDataStream)
-      case _ =>
-        throw TableException(s"Unsupported join type '$joinType'. Currently only " +
-          s"non-window inner/left/right joins with at least one equality predicate are supported")
-    }
-
+    val connectOperator = leftDataStream.connect(rightDataStream)
     // input must not be nullable, because the runtime join function will make sure
     // the code-generated function won't process null inputs
     val generator = new FunctionCodeGenerator(
@@ -209,6 +202,22 @@ class DataStreamJoin(
           genFunction.code,
           joinType == JoinRelType.LEFT,
           queryConfig)
+      case JoinRelType.FULL if joinInfo.isEqui =>
+        new NonWindowFullJoin(
+          leftSchema.typeInfo,
+          rightSchema.typeInfo,
+          CRowTypeInfo(returnType),
+          genFunction.name,
+          genFunction.code,
+          queryConfig)
+      case JoinRelType.FULL =>
+        new NonWindowFullJoinWithNonEquiPredicates(
+          leftSchema.typeInfo,
+          rightSchema.typeInfo,
+          CRowTypeInfo(returnType),
+          genFunction.name,
+          genFunction.code,
+          queryConfig)
     }
 
     val joinOpName = joinToString(getRowType, joinCondition, joinType, getExpressionString)

http://git-wip-us.apache.org/repos/asf/flink/blob/6ce94c1f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala
new file mode 100644
index 0000000..d2bcb6a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for full outer join without non-equal
+  * predicates.
+  *
+  * @param leftType        the input type of left stream
+  * @param rightType       the input type of right stream
+  * @param resultType      the output type of join
+  * @param genJoinFuncName the function code without any non-equi condition
+  * @param genJoinFuncCode the function name without any non-equi condition
+  * @param queryConfig     the configuration for the query to generate
+  */
+class NonWindowFullJoin(
+    leftType: TypeInformation[Row],
+    rightType: TypeInformation[Row],
+    resultType: TypeInformation[CRow],
+    genJoinFuncName: String,
+    genJoinFuncCode: String,
+    queryConfig: StreamQueryConfig)
+  extends NonWindowOuterJoin(
+    leftType,
+    rightType,
+    resultType,
+    genJoinFuncName,
+    genJoinFuncCode,
+    false,
+    queryConfig) {
+
+  override def open(parameters: Configuration): Unit = {
+    super.open(parameters)
+    LOG.debug(s"Instantiating NonWindowFullJoin")
+  }
+
+  /**
+    * Puts or Retract an element from the input stream into state and search the other state to
+    * output records meet the condition. The input row will be preserved and appended with null, if
+    * there is no match. Records will be expired in state if state retention time has been
+    * specified.
+    */
+  override def processElement(
+      value: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow],
+      timerState: ValueState[Long],
+      currentSideState: MapState[Row, JTuple2[Long, Long]],
+      otherSideState: MapState[Row, JTuple2[Long, Long]],
+      recordFromLeft: Boolean): Unit = {
+
+    val inputRow = value.row
+    updateCurrentSide(value, ctx, timerState, currentSideState)
+
+    cRowWrapper.reset()
+    cRowWrapper.setCollector(out)
+    cRowWrapper.setChange(value.change)
+
+    retractJoin(value, recordFromLeft, currentSideState, otherSideState)
+
+    // preserved current input if there is no matched rows from the other side.
+    if (cRowWrapper.getEmitCnt == 0) {
+      cRowWrapper.setTimes(1)
+      collectAppendNull(inputRow, recordFromLeft, cRowWrapper)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/6ce94c1f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala
new file mode 100644
index 0000000..b442a88
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for full outer join with non-equal
+  * predicates. An MapState of type [Row, Long] is used to record how many matched rows for the
+  * specified row. Full outer join without non-equal predicates doesn't need it because rows from
+  * one side can always join rows from the other side as long as join keys are same.
+  *
+  * @param leftType        the input type of left stream
+  * @param rightType       the input type of right stream
+  * @param resultType      the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig     the configuration for the query to generate
+  */
+class NonWindowFullJoinWithNonEquiPredicates(
+    leftType: TypeInformation[Row],
+    rightType: TypeInformation[Row],
+    resultType: TypeInformation[CRow],
+    genJoinFuncName: String,
+    genJoinFuncCode: String,
+    queryConfig: StreamQueryConfig)
+  extends NonWindowOuterJoinWithNonEquiPredicates(
+    leftType,
+    rightType,
+    resultType,
+    genJoinFuncName,
+    genJoinFuncCode,
+    false,
+    queryConfig) {
+
+  override def open(parameters: Configuration): Unit = {
+    super.open(parameters)
+    LOG.debug("Instantiating NonWindowFullJoinWithNonEquiPredicates.")
+  }
+
+  /**
+    * Puts or Retract an element from the input stream into state and search the other state to
+    * output records meet the condition. The input row will be preserved and appended with null, if
+    * there is no match. Records will be expired in state if state retention time has been
+    * specified.
+    */
+  override def processElement(
+      value: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow],
+      timerState: ValueState[Long],
+      currentSideState: MapState[Row, JTuple2[Long, Long]],
+      otherSideState: MapState[Row, JTuple2[Long, Long]],
+      recordFromLeft: Boolean): Unit = {
+
+    val currentJoinCntState = getJoinCntState(joinCntState, recordFromLeft)
+    val inputRow = value.row
+    val cntAndExpiredTime = updateCurrentSide(value, ctx, timerState, currentSideState)
+    if (!value.change && cntAndExpiredTime.f0 <= 0) {
+      currentJoinCntState.remove(inputRow)
+    }
+
+    cRowWrapper.reset()
+    cRowWrapper.setCollector(out)
+    cRowWrapper.setChange(value.change)
+
+    val otherSideJoinCntState = getJoinCntState(joinCntState, !recordFromLeft)
+    retractJoinWithNonEquiPreds(value, recordFromLeft, otherSideState, otherSideJoinCntState)
+
+    // init matched cnt only when new row is added, i.e, cnt is changed from 0 to 1.
+    if (value.change && cntAndExpiredTime.f0 == 1) {
+      currentJoinCntState.put(inputRow, cRowWrapper.getEmitCnt)
+    }
+    // if there is no matched rows
+    if (cRowWrapper.getEmitCnt == 0) {
+      cRowWrapper.setTimes(1)
+      collectAppendNull(inputRow, recordFromLeft, cRowWrapper)
+    }
+  }
+
+  /**
+    * Removes records which are expired from left state. Register a new timer if the state still
+    * holds records after the clean-up. Also, clear leftJoinCnt map state when clear left
+    * rowMapState.
+    */
+  override def expireOutTimeRow(
+      curTime: Long,
+      rowMapState: MapState[Row, JTuple2[Long, Long]],
+      timerState: ValueState[Long],
+      isLeft: Boolean,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = {
+
+    expireOutTimeRow(curTime, rowMapState, timerState, isLeft, joinCntState, ctx)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/6ce94c1f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
index 73b4b5b..8877b89 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
@@ -130,6 +130,7 @@ abstract class NonWindowOuterJoin(
       if (recordNum == 1 && value.change) {
         cRowWrapper.setChange(false)
         collectAppendNull(otherSideRow, !inputRowFromLeft, cRowWrapper)
+        // recover for the next iteration
         cRowWrapper.setChange(true)
       }
       // do normal join
@@ -139,6 +140,8 @@ abstract class NonWindowOuterJoin(
       if (!value.change && recordNum == 0) {
         cRowWrapper.setChange(true)
         collectAppendNull(otherSideRow, !inputRowFromLeft, cRowWrapper)
+        // recover for the next iteration
+        cRowWrapper.setChange(false)
       }
       // clear expired data. Note: clear after join to keep closer to the original semantics
       if (stateCleaningEnabled && curProcessTime >= otherSideCntAndExpiredTime.f1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6ce94c1f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala
index 315fa0d..6812a06 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala
@@ -109,6 +109,7 @@ import org.apache.flink.types.Row
             // retract previous non matched result row
             cRowWrapper.setChange(false)
             collectAppendNull(otherSideRow, !inputRowFromLeft, cRowWrapper)
+            // recover for the next iteration
             cRowWrapper.setChange(true)
           }
           // do normal join
@@ -121,6 +122,8 @@ import org.apache.flink.types.Row
             // output non matched result row
             cRowWrapper.setChange(true)
             collectAppendNull(otherSideRow, !inputRowFromLeft, cRowWrapper)
+            // recover for the next iteration
+            cRowWrapper.setChange(false)
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/6ce94c1f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
index f999827..eb9d4b2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
@@ -442,6 +442,61 @@ class RetractionRulesTest extends TableTestBase {
       )
     util.verifyTableTrait(resultTable, expected)
   }
+
+  @Test
+  def testFullJoin(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val lTable = util.addTable[(Int, Int)]('a, 'b)
+    val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+    val resultTable = lTable
+      .fullOuterJoin(rTable, 'b === 'bb)
+      .select('a, 'b, 'c)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamJoin",
+          "DataStreamScan(true, Acc)",
+          "DataStreamScan(true, Acc)",
+          "false, AccRetract"
+        ),
+        "false, AccRetract"
+      )
+    util.verifyTableTrait(resultTable, expected)
+  }
+
+  @Test
+  def testAggFollowedWithFullJoin(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val lTable = util.addTable[(Int, Int)]('a, 'b)
+    val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+    val countDistinct = new CountDistinct
+    val resultTable = lTable
+      .fullOuterJoin(rTable, 'b === 'bb)
+      .select('a, 'b, 'c)
+      .groupBy('a)
+      .select('a, countDistinct('c))
+
+    val expected =
+      unaryNode(
+        "DataStreamGroupAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          binaryNode(
+            "DataStreamJoin",
+            "DataStreamScan(true, Acc)",
+            "DataStreamScan(true, Acc)",
+            "true, AccRetract"
+          ),
+          "true, AccRetract"
+        ),
+        "false, Acc"
+      )
+    util.verifyTableTrait(resultTable, expected)
+  }
 }
 
 class StreamTableTestForRetractionUtil extends StreamTableTestUtil {

http://git-wip-us.apache.org/repos/asf/flink/blob/6ce94c1f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index 0a511c7..cc5a1fd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -1561,4 +1561,346 @@ class JoinHarnessTest extends HarnessTestBase {
 
     testHarness.close()
   }
+
+  @Test
+  def testNonWindowFullJoinWithoutNonEqualPred() {
+
+    val joinReturnType = CRowTypeInfo(new RowTypeInfo(
+      Array[TypeInformation[_]](
+        INT_TYPE_INFO,
+        STRING_TYPE_INFO,
+        INT_TYPE_INFO,
+        STRING_TYPE_INFO),
+      Array("a", "b", "c", "d")))
+
+    val joinProcessFunc = new NonWindowFullJoin(
+      rowType,
+      rowType,
+      joinReturnType,
+      "TestJoinFunction",
+      funcCode,
+      queryConfig)
+
+    val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] =
+      new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc)
+    val testHarness: KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow] =
+      new KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow](
+        operator,
+        new TupleRowKeySelector[Integer](0),
+        new TupleRowKeySelector[Integer](0),
+        BasicTypeInfo.INT_TYPE_INFO,
+        1, 1, 0)
+
+    testHarness.open()
+
+    // left stream input
+    testHarness.setProcessingTime(1)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb"), change = true)))
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc"), change = true)))
+    assertEquals(1, testHarness.numProcessingTimeTimers())
+    // 1 left timer(5), 1 left key(1)
+    assertEquals(2, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(2)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb"), change = true)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "ccc"), change = true)))
+    assertEquals(2, testHarness.numProcessingTimeTimers())
+    // 1 left timer(5), 1 left key(1)
+    // 1 right timer(6), 1 right key(1)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(3)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa"), change = true)))
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "ddd"), change = true)))
+    assertEquals(3, testHarness.numProcessingTimeTimers())
+    // 2 left timer(5,7), 2 left key(1,2)
+    // 1 right timer(6), 1 right key(1)
+    assertEquals(6, testHarness.numKeyedStateEntries())
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa"), change = true)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "ddd"), change = true)))
+    assertEquals(4, testHarness.numProcessingTimeTimers())
+    // 2 left timer(5,7), 2 left key(1,2)
+    // 2 right timer(6,7), 2 right key(1,2)
+    assertEquals(8, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(4)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa"), change = false)))
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "ddd"), change = false)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa"), change = false)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "ddd"), change = false)))
+    assertEquals(4, testHarness.numProcessingTimeTimers())
+    // 2 left timer(5,7), 1 left key(1)
+    // 2 right timer(6,7), 1 right key(2)
+    assertEquals(6, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(5)
+    assertEquals(3, testHarness.numProcessingTimeTimers())
+    // 1 left timer(7)
+    // 2 right timer(6,7), 1 right key(2)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(6)
+    assertEquals(2, testHarness.numProcessingTimeTimers())
+    // 1 left timer(7)
+    // 2 right timer(7)
+    assertEquals(2, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(7)
+    assertEquals(0, testHarness.numProcessingTimeTimers())
+    assertEquals(0, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(8)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb"), change = true)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb"), change = true)))
+
+    val result = testHarness.getOutput
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // processing time 1
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = true)))
+    // processing time 2
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = true)))
+    // processing time 3
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa", 2: JInt, "bbb"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa", 2: JInt, "ccc"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "ddd", 2: JInt, "bbb"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "ddd", 2: JInt, "ccc"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", 1: JInt, "aaa"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc", 1: JInt, "aaa"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", 1: JInt, "ddd"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc", 1: JInt, "ddd"), change = true)))
+    // processing time 4
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa", 2: JInt, "bbb"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa", 2: JInt, "ccc"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "ddd", 2: JInt, "bbb"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "ddd", 2: JInt, "ccc"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", 1: JInt, "aaa"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc", 1: JInt, "aaa"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", 1: JInt, "ddd"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc", 1: JInt, "ddd"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = true)))
+    // processing time 8
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+
+    verify(expectedOutput, result, new RowResultSortComparator())
+    testHarness.close()
+  }
+
+  @Test
+  def testNonWindowFullJoinWithNonEqualPred() {
+
+    val joinReturnType = CRowTypeInfo(new RowTypeInfo(
+      Array[TypeInformation[_]](
+        INT_TYPE_INFO,
+        STRING_TYPE_INFO,
+        INT_TYPE_INFO,
+        STRING_TYPE_INFO),
+      Array("a", "b", "c", "d")))
+
+    val joinProcessFunc = new NonWindowFullJoinWithNonEquiPredicates(
+      rowType,
+      rowType,
+      joinReturnType,
+      "TestJoinFunction",
+      funcCodeWithNonEqualPred2,
+      queryConfig)
+
+    val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] =
+      new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc)
+    val testHarness: KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow] =
+      new KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow](
+        operator,
+        new TupleRowKeySelector[Integer](0),
+        new TupleRowKeySelector[Integer](0),
+        BasicTypeInfo.INT_TYPE_INFO,
+        1, 1, 0)
+
+    testHarness.open()
+
+    // left stream input
+    testHarness.setProcessingTime(1)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb"), change = true)))
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc"), change = true)))
+    assertEquals(1, testHarness.numProcessingTimeTimers())
+    // 1 left timer(5), 1 left key(1), 1 left joincnt key(1)
+    assertEquals(3, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(2)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb"), change = true)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "ccc"), change = true)))
+    assertEquals(2, testHarness.numProcessingTimeTimers())
+    // 1 left timer(5), 1 left key(1), 1 left joincnt key(1)
+    // 1 right timer(6), 1 right key(1), 1 right joincnt key(1)
+    assertEquals(6, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(3)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa"), change = true)))
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "ddd"), change = true)))
+    assertEquals(3, testHarness.numProcessingTimeTimers())
+    // 2 left timer(5,7), 2 left key(1,2), 2 left joincnt key(1,2)
+    // 1 right timer(6), 1 right key(1), 1 right joincnt key(1)
+    assertEquals(9, testHarness.numKeyedStateEntries())
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa"), change = true)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "ddd"), change = true)))
+    assertEquals(4, testHarness.numProcessingTimeTimers())
+    // 2 left timer(5,7), 2 left key(1,2), 2 left joincnt key(1,2)
+    // 2 right timer(6,7), 2 right key(1,2), 2 right joincnt key(1,2)
+    assertEquals(12, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(4)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa"), change = false)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "ddd"), change = false)))
+    assertEquals(4, testHarness.numProcessingTimeTimers())
+    // 2 left timer(5,7), 2 left key(1,2), 2 left joincnt key(1,2)
+    // 2 right timer(6,7), 2 right key(1,2), 2 right joincnt key(1,2)
+    assertEquals(12, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(5)
+    assertEquals(3, testHarness.numProcessingTimeTimers())
+    // 1 left timer(7), 1 left key(2), 1 left joincnt key(2)
+    // 2 right timer(6,7), 2 right key(1,2), 2 right joincnt key(1,2)
+    assertEquals(9, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(6)
+    assertEquals(2, testHarness.numProcessingTimeTimers())
+    // 1 left timer(7), 1 left key(2), 1 left joincnt key(2)
+    // 1 right timer(7), 1 right key(2), 1 right joincnt key(2)
+    assertEquals(6, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(7)
+    assertEquals(0, testHarness.numProcessingTimeTimers())
+    assertEquals(0, testHarness.numKeyedStateEntries())
+
+    testHarness.setProcessingTime(8)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb"), change = true)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb"), change = true)))
+
+    val result = testHarness.getOutput
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // processing time 1
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = true)))
+    // processing time 2
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = true)))
+    // processing time 3
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa", 2: JInt, "bbb"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa", 2: JInt, "ccc"), change = true)))
+    // can not find matched row due to NonEquiJoinPred
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "ddd", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", 1: JInt, "ddd"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc", 1: JInt, "ddd"), change = true)))
+    // can not find matched row due to NonEquiJoinPred
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true)))
+    // processing time 4
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa", 2: JInt, "bbb"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa", 2: JInt, "ccc"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", 1: JInt, "ddd"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc", 1: JInt, "ddd"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = true)))
+    // processing time 8
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+
+    verify(expectedOutput, result, new RowResultSortComparator())
+    testHarness.close()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6ce94c1f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index ea7c651..8d42da5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -1198,6 +1198,32 @@ class JoinITCase extends StreamingWithStateTestBase {
     env.execute()
     assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
+
+  @Test
+  def testFullOuterJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e"
+
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sqlQuery(sqlQuery)
+
+    val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt",
+      "null,Hallo Welt wie", "null,Hallo Welt wie gehts?", "null,ABC", "null,BCD",
+      "null,CDE", "null,DEF", "null,EFG", "null,FGH", "null,GHI", "null,HIJ",
+      "null,IJK", "null,JKL", "null,KLM")
+    val results = result.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
 }
 
 private class Row4WatermarkExtractor

http://git-wip-us.apache.org/repos/asf/flink/blob/6ce94c1f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
index 8285a3f..e145448 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
@@ -588,16 +588,91 @@ class JoinITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 
-  @Test(expected = classOf[TableException])
-  def testFullOuterJoin(): Unit = {
+  @Test
+  def testFullOuterJoinWithMultipleKeys(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.clear
     env.setStateBackend(getStateBackend)
 
-    val leftTable = env.fromCollection(List((1, 2))).toTable(tEnv, 'a, 'b)
-    val rightTable = env.fromCollection(List((1, 2))).toTable(tEnv, 'bb, 'c)
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
+
+    val expected = Seq(
+      "Hi,Hallo", "Hello,Hallo Welt", "null,Hallo Welt wie",
+      "Hello world,Hallo Welt wie gehts?", "Hello world,ABC", "null,BCD", "null,CDE",
+      "null,DEF", "null,EFG", "null,FGH", "null,GHI", "I am fine.,HIJ",
+      "I am fine.,IJK", "null,JKL", "null,KLM", "Luke Skywalker,null",
+      "Comment#1,null", "Comment#2,null", "Comment#3,null", "Comment#4,null",
+      "Comment#5,null", "Comment#6,null", "Comment#7,null", "Comment#8,null",
+      "Comment#9,null", "Comment#10,null", "Comment#11,null", "Comment#12,null",
+      "Comment#13,null", "Comment#14,null", "Comment#15,null",
+      "Hello world, how are you?,null")
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
 
-    leftTable.fullOuterJoin(rightTable, 'a ==='bb).toAppendStream[Row]
+  @Test
+  def testFullJoinWithNonEquiJoinPred(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b <= 'h).select('c, 'g)
+
+    val expected = Seq(
+      // join matches
+      "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", "Hello world,ABC",
+      "Hello world,BCD", "I am fine.,HIJ", "I am fine.,IJK",
+      // preserved left
+      "Hello world, how are you?,null", "Luke Skywalker,null", "Comment#1,null", "Comment#2,null",
+      "Comment#3,null", "Comment#4,null", "Comment#5,null", "Comment#6,null", "Comment#7,null",
+      "Comment#8,null", "Comment#9,null", "Comment#10,null", "Comment#11,null", "Comment#12,null",
+      "Comment#13,null", "Comment#14,null", "Comment#15,null",
+      // preserved right
+      "null,Hallo Welt wie", "null,CDE", "null,DEF", "null,EFG", "null,FGH", "null,GHI", "null,JKL",
+      "null,KLM")
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testFullJoinWithLeftLocalPred(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b >= 2 && 'h === 1).select('c, 'g)
+
+    val expected = Seq(
+      // join matches
+      "Hello,Hallo Welt wie", "Hello world, how are you?,DEF", "Hello world, how are you?,EFG",
+      "I am fine.,GHI",
+      // preserved left
+      "Hi,null", "Hello world,null", "Luke Skywalker,null",
+      "Comment#1,null", "Comment#2,null", "Comment#3,null", "Comment#4,null", "Comment#5,null",
+      "Comment#6,null", "Comment#7,null", "Comment#8,null", "Comment#9,null", "Comment#10,null",
+      "Comment#11,null", "Comment#12,null", "Comment#13,null", "Comment#14,null", "Comment#15,null",
+      // preserved right
+      "null,Hallo", "null,Hallo Welt", "null,Hallo Welt wie gehts?", "null,ABC", "null,BCD",
+      "null,CDE", "null,FGH", "null,HIJ", "null,IJK", "null,JKL", "null,KLM")
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 }