You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/05/17 09:06:46 UTC

[2/2] flink git commit: [FLINK-8428] [table] Implement stream-stream non-window left outer join

[FLINK-8428] [table] Implement stream-stream non-window left outer join

Two different CoProcessFunctions are used to implement left join for performance reasons. One for left join with non-equal predicates, the other for left join without non-equal predicates. The main difference between them is, for left join without non-equal predicates, left rows can always find matching right rows as long as join keys are same.

- Left join with non-equal predicates: Use a mapState to keep how many rows(joinCnt) from right table can be matched by current left row. If joinCnt is 0, output NULL right with left row. If joinCnt is changed from 0 to 1, retract the previous NULL right output and output the matched result. If joinCnt is changed from 1 to 0 when received a right retract input, retract the previous mathched result and output NULL right with left row.
- Left join without non-equal predicates: We don't need to count joinCnt any more, because joinCnt is same with right state size, so check state size is ok.

Table Modes:
Left join will generate retractions, so DataStreamRel node of left join will working under AccRetract mode. Also, the table mode of dynamic table produced by left join is Update Mode, even if the table does not include a key definition.

This closes #5327.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b95ba39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b95ba39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b95ba39

Branch: refs/heads/master
Commit: 8b95ba399ef4ae1523e95a44d6678ef807ead6d8
Parents: 489e428
Author: hequn8128 <ch...@gmail.com>
Authored: Sun Jan 21 12:54:08 2018 +0800
Committer: Timo Walther <tw...@apache.org>
Committed: Thu May 17 10:59:00 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |   4 +-
 docs/dev/table/tableApi.md                      |   4 +-
 .../plan/nodes/datastream/DataStreamJoin.scala  |  45 ++-
 .../plan/nodes/datastream/DataStreamRel.scala   |   5 +
 .../datastream/DataStreamRetractionRules.scala  |  32 +-
 .../table/plan/util/UpdatingPlanChecker.scala   |  77 ++---
 .../CRowWrappingMultiOutputCollector.scala      |  50 ---
 .../join/CRowWrappingMultiOutputCollector.scala |  70 ++++
 .../table/runtime/join/CountingCollector.scala  |  39 +++
 .../table/runtime/join/NonWindowInnerJoin.scala | 235 ++------------
 .../table/runtime/join/NonWindowJoin.scala      | 296 +++++++++++++++++
 .../runtime/join/NonWindowLeftRightJoin.scala   |  95 ++++++
 ...ndowLeftRightJoinWithNonEquiPredicates.scala | 119 +++++++
 .../table/runtime/join/NonWindowOuterJoin.scala | 187 +++++++++++
 ...onWindowOuterJoinWithNonEquiPredicates.scala | 194 +++++++++++
 .../validation/TableSinkValidationTest.scala    |  16 +
 .../flink/table/plan/RetractionRulesTest.scala  |  82 ++++-
 .../table/plan/UpdatingPlanCheckerTest.scala    | 102 ++++++
 .../table/runtime/batch/sql/JoinITCase.scala    |  12 +-
 .../table/runtime/batch/table/JoinITCase.scala  |  18 +-
 .../table/runtime/harness/JoinHarnessTest.scala | 288 +++++++++++++++++
 .../table/runtime/stream/sql/JoinITCase.scala   | 281 ++++++++++++----
 .../table/runtime/stream/table/JoinITCase.scala | 322 ++++++++++++++++++-
 .../runtime/stream/table/TableSinkITCase.scala  |  21 ++
 24 files changed, 2164 insertions(+), 430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 8aa1bd7..47fbe94 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -394,9 +394,11 @@ FROM Orders INNER JOIN Product ON Orders.productId = Product.id
     <tr>
       <td><strong>Outer Equi-join</strong><br>
         <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Streaming</span>
+        <span class="label label-info">Result Updating</span>
       </td>
       <td>
-        <p>Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.</p>
+        <p>Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported. Right and full joins are not supported in streaming yet.</p>
         <p><b>Note:</b> The order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. Make sure to specify tables in an order that does not yield a cross join (Cartesian product) which are not supported and would cause a query to fail.</p>
 {% highlight sql %}
 SELECT *

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 8c75078..ebbe87c 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -508,9 +508,11 @@ Table result = left.join(right).where("a = d").select("a, b, e");
       <td>
         <strong>Outer Joins</strong><br>
         <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Streaming</span>
+        <span class="label label-info">Result Updating</span>
       </td>
       <td>
-        <p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
+        <p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined. Right and full joins are not supported in streaming yet.</p>
 {% highlight java %}
 Table left = tableEnv.fromDataSet(ds1, "a, b, c");
 Table right = tableEnv.fromDataSet(ds2, "d, e, f");

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
index 853006f..e7ce3bc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.codegen.FunctionCodeGenerator
 import org.apache.flink.table.plan.nodes.CommonJoin
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.CRowKeySelector
-import org.apache.flink.table.runtime.join.NonWindowInnerJoin
+import org.apache.flink.table.runtime.join.{NonWindowInnerJoin, NonWindowLeftRightJoin, NonWindowLeftRightJoinWithNonEquiPredicates}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.types.Row
 
@@ -60,6 +60,9 @@ class DataStreamJoin(
 
   override def needsUpdatesAsRetraction: Boolean = true
 
+  // outer join will generate retractions
+  override def producesRetractions: Boolean = joinType != JoinRelType.INNER
+
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamJoin(
       cluster,
@@ -137,10 +140,10 @@ class DataStreamJoin(
       right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
 
     val (connectOperator, nullCheck) = joinType match {
-      case JoinRelType.INNER => (leftDataStream.connect(rightDataStream), false)
+      case JoinRelType.INNER | JoinRelType.LEFT => (leftDataStream.connect(rightDataStream), false)
       case _ =>
         throw TableException(s"Unsupported join type '$joinType'. Currently only " +
-          s"non-window inner joins with at least one equality predicate are supported")
+          s"non-window inner/left joins with at least one equality predicate are supported")
     }
 
     val generator = new FunctionCodeGenerator(
@@ -176,14 +179,34 @@ class DataStreamJoin(
       body,
       returnType)
 
-    val coMapFun =
-      new NonWindowInnerJoin(
-        leftSchema.typeInfo,
-        rightSchema.typeInfo,
-        CRowTypeInfo(returnType),
-        genFunction.name,
-        genFunction.code,
-        queryConfig)
+    val coMapFun = joinType match {
+      case JoinRelType.INNER =>
+        new NonWindowInnerJoin(
+          leftSchema.typeInfo,
+          rightSchema.typeInfo,
+          CRowTypeInfo(returnType),
+          genFunction.name,
+          genFunction.code,
+          queryConfig)
+      case JoinRelType.LEFT if joinInfo.isEqui =>
+        new NonWindowLeftRightJoin(
+          leftSchema.typeInfo,
+          rightSchema.typeInfo,
+          CRowTypeInfo(returnType),
+          genFunction.name,
+          genFunction.code,
+          joinType == JoinRelType.LEFT,
+          queryConfig)
+      case JoinRelType.LEFT =>
+        new NonWindowLeftRightJoinWithNonEquiPredicates(
+          leftSchema.typeInfo,
+          rightSchema.typeInfo,
+          CRowTypeInfo(returnType),
+          genFunction.name,
+          genFunction.code,
+          joinType == JoinRelType.LEFT,
+          queryConfig)
+    }
 
     val joinOpName = joinToString(getRowType, joinCondition, joinType, getExpressionString)
     connectOperator

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index fc5570d..5834d61 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -53,4 +53,9 @@ trait DataStreamRel extends FlinkRelNode {
     */
   def consumesRetractions: Boolean = false
 
+  /**
+    * Whether the [[DataStreamRel]] produces retraction messages.
+    * It might forward retraction messages nevertheless.
+    */
+  def producesRetractions: Boolean = false
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
index f0b725d..9eb5574 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
@@ -190,15 +190,6 @@ object DataStreamRetractionRules {
     "SetAccModeRule") {
 
     /**
-      * Checks if a [[RelNode]] produces update and delete changes.
-      */
-    def producesUpdates(relNode: RelNode): Boolean = {
-      relNode match {
-        case dsr: DataStreamRel => dsr.producesUpdates
-      }
-    }
-
-    /**
       * Set [[AccMode.AccRetract]] to a [[RelNode]].
       */
     def setAccRetract(relNode: RelNode): RelNode = {
@@ -207,28 +198,17 @@ object DataStreamRetractionRules {
     }
 
     /**
-      * Checks if a [[RelNode]] consumes retraction messages instead of forwarding them.
-      * The node might or might not produce new retraction messages.
-      * This is checked by [[producesRetractions()]].
-      */
-    def consumesRetractions(relNode: RelNode): Boolean = {
-      relNode match {
-        case dsr: DataStreamRel => dsr.consumesRetractions
-      }
-    }
-
-    /**
-      * Checks if a [[RelNode]] produces retraction messages.
+      * Checks if a [[DataStreamRel]] produces retraction messages.
       */
-    def producesRetractions(node: RelNode): Boolean = {
-      sendsUpdatesAsRetraction(node) && producesUpdates(node)
+    def producesRetractions(node: DataStreamRel): Boolean = {
+      sendsUpdatesAsRetraction(node) && node.producesUpdates || node.producesRetractions
     }
 
     /**
-      * Checks if a [[RelNode]] forwards retraction messages from its children.
+      * Checks if a [[DataStreamRel]] forwards retraction messages from its children.
       */
-    def forwardsRetractions(parent: RelNode, children: Seq[RelNode]): Boolean = {
-      children.exists(c => isAccRetract(c)) && !consumesRetractions(parent)
+    def forwardsRetractions(parent: DataStreamRel, children: Seq[RelNode]): Boolean = {
+      children.exists(c => isAccRetract(c)) && !parent.consumesRetractions
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
index c1ebac8..4b7d0ed 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.table.plan.util
 
-import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rel.{RelNode, RelVisitor}
 import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
 import org.apache.calcite.sql.SqlKind
@@ -54,7 +53,7 @@ object UpdatingPlanChecker {
 
     override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
       node match {
-        case s: DataStreamRel if s.producesUpdates =>
+        case s: DataStreamRel if s.producesUpdates || s.producesRetractions =>
           isAppendOnly = false
         case _ =>
           super.visit(node, ordinal, parent)
@@ -149,45 +148,39 @@ object UpdatingPlanChecker {
           }
 
         case j: DataStreamJoin =>
-          val joinType = j.getJoinType
-          joinType match {
-            case JoinRelType.INNER =>
-              // get key(s) for inner join
-              val lInKeys = visit(j.getLeft)
-              val rInKeys = visit(j.getRight)
-              if (lInKeys.isEmpty || rInKeys.isEmpty) {
-                None
-              } else {
-                // Output of inner join must have keys if left and right both contain key(s).
-                // Key groups from both side will be merged by join equi-predicates
-                val lInNames: Seq[String] = j.getLeft.getRowType.getFieldNames
-                val rInNames: Seq[String] = j.getRight.getRowType.getFieldNames
-                val joinNames = j.getRowType.getFieldNames
-
-                // if right field names equal to left field names, calcite will rename right
-                // field names. For example, T1(pk, a) join T2(pk, b), calcite will rename T2(pk, b)
-                // to T2(pk0, b).
-                val rInNamesToJoinNamesMap = rInNames
-                  .zip(joinNames.subList(lInNames.size, joinNames.length))
-                  .toMap
+          // get key(s) for join
+          val lInKeys = visit(j.getLeft)
+          val rInKeys = visit(j.getRight)
+          if (lInKeys.isEmpty || rInKeys.isEmpty) {
+            None
+          } else {
+            // Output of join must have keys if left and right both contain key(s).
+            // Key groups from both side will be merged by join equi-predicates
+            val lInNames: Seq[String] = j.getLeft.getRowType.getFieldNames
+            val rInNames: Seq[String] = j.getRight.getRowType.getFieldNames
+            val joinNames = j.getRowType.getFieldNames
+
+            // if right field names equal to left field names, calcite will rename right
+            // field names. For example, T1(pk, a) join T2(pk, b), calcite will rename T2(pk, b)
+            // to T2(pk0, b).
+            val rInNamesToJoinNamesMap = rInNames
+              .zip(joinNames.subList(lInNames.size, joinNames.length))
+              .toMap
 
-                val lJoinKeys: Seq[String] = j.getJoinInfo.leftKeys
-                  .map(lInNames.get(_))
-                val rJoinKeys: Seq[String] = j.getJoinInfo.rightKeys
-                  .map(rInNames.get(_))
-                  .map(rInNamesToJoinNamesMap(_))
+            val lJoinKeys: Seq[String] = j.getJoinInfo.leftKeys
+              .map(lInNames.get(_))
+            val rJoinKeys: Seq[String] = j.getJoinInfo.rightKeys
+              .map(rInNames.get(_))
+              .map(rInNamesToJoinNamesMap(_))
 
-                val inKeys: Seq[(String, String)] = lInKeys.get ++ rInKeys.get
-                    .map(e => (rInNamesToJoinNamesMap(e._1), rInNamesToJoinNamesMap(e._2)))
+            val inKeys: Seq[(String, String)] = lInKeys.get ++ rInKeys.get
+              .map(e => (rInNamesToJoinNamesMap(e._1), rInNamesToJoinNamesMap(e._2)))
 
-                getOutputKeysForInnerJoin(
-                  joinNames,
-                  inKeys,
-                  lJoinKeys.zip(rJoinKeys)
-                )
-              }
-            case _ =>
-              throw new UnsupportedOperationException(s"Unsupported join type '$joinType'")
+            getOutputKeysForNonWindowJoin(
+              joinNames,
+              inKeys,
+              lJoinKeys.zip(rJoinKeys)
+            )
           }
         case _: DataStreamRel =>
           // anything else does not forward keys, so we can stop
@@ -196,14 +189,14 @@ object UpdatingPlanChecker {
     }
 
     /**
-      * Get output keys for non-window inner join according to it's inputs.
+      * Get output keys for non-window join according to it's inputs.
       *
       * @param inNames  Field names of join
       * @param inKeys   Input keys of join
-      * @param joinKeys JoinKeys of inner join
-      * @return Return output keys of inner join
+      * @param joinKeys JoinKeys of join
+      * @return Return output keys of join
       */
-    def getOutputKeysForInnerJoin(
+    def getOutputKeysForNonWindowJoin(
         inNames: Seq[String],
         inKeys: Seq[(String, String)],
         joinKeys: Seq[(String, String)])

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingMultiOutputCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingMultiOutputCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingMultiOutputCollector.scala
deleted file mode 100644
index d551111..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingMultiOutputCollector.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime
-
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-
-/**
-  * The collector to wrap a [[Row]] into a [[CRow]] and collect it multiple times.
-  */
-class CRowWrappingMultiOutputCollector() extends Collector[Row] {
-
-  private var out: Collector[CRow] = _
-  private val outCRow: CRow = new CRow()
-  private var times: Long = 0L
-
-  def setCollector(collector: Collector[CRow]): Unit = this.out = collector
-
-  def setChange(change: Boolean): Unit = this.outCRow.change = change
-
-  def setTimes(times: Long): Unit = this.times = times
-
-  override def collect(record: Row): Unit = {
-    outCRow.row = record
-    var i: Long = 0L
-    while (i < times) {
-      out.collect(outCRow)
-      i += 1
-    }
-  }
-
-  override def close(): Unit = out.close()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CRowWrappingMultiOutputCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CRowWrappingMultiOutputCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CRowWrappingMultiOutputCollector.scala
new file mode 100644
index 0000000..9b48c31
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CRowWrappingMultiOutputCollector.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * The collector to wrap a [[Row]] into a [[CRow]] and collect it multiple times. This collector
+  * can also used to count output record number.
+  */
+class CRowWrappingMultiOutputCollector extends Collector[Row] {
+
+  private var out: Collector[CRow] = _
+  private val outCRow: CRow = new CRow(null, true)
+  // times for collect
+  private var times: Long = 0L
+  // count how many records have been emitted
+  private var emitCnt: Long = 0L
+
+  def setCollector(collector: Collector[CRow]): Unit = this.out = collector
+
+  def setChange(change: Boolean): Unit = this.outCRow.change = change
+
+  def setRow(row: Row): Unit = this.outCRow.row = row
+
+  def getRow: Row = this.outCRow.row
+
+  def setTimes(times: Long): Unit = this.times = times
+
+  def setEmitCnt(emitted: Long): Unit = this.emitCnt = emitted
+
+  def getEmitCnt: Long = emitCnt
+
+  override def collect(record: Row): Unit = {
+    outCRow.row = record
+    emitCnt += times
+    var i: Long = 0L
+    while (i < times) {
+      out.collect(outCRow)
+      i += 1
+    }
+  }
+
+  def reset(): Unit = {
+    this.times = 0
+    this.emitCnt = 0
+  }
+
+  override def close(): Unit = out.close()
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CountingCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CountingCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CountingCollector.scala
new file mode 100644
index 0000000..46c33cc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CountingCollector.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * This collector is used to count how many rows have been collected. It will not collect data
+  * actually.
+  */
+class CountingCollector extends Collector[Row] {
+  // count how many records may be emitted
+  private var emitCnt: Long = 0L
+
+  override def collect(record: Row): Unit = emitCnt += 1
+
+  def reset(): Unit = emitCnt = 0
+
+  def getEmitCnt: Long = emitCnt
+
+  override def close(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
index 6fef701..e511ed1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
@@ -15,28 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 package org.apache.flink.table.runtime.join
 
-import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.state._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction
-import org.apache.flink.table.api.{StreamQueryConfig, Types}
-import org.apache.flink.table.codegen.Compiler
-import org.apache.flink.table.runtime.CRowWrappingMultiOutputCollector
+import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.table.typeutils.TypeCheckUtils
-import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
-import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 
-
 /**
   * Connect data for left stream and right stream. Only use for innerJoin.
   *
@@ -54,135 +44,17 @@ class NonWindowInnerJoin(
     genJoinFuncName: String,
     genJoinFuncCode: String,
     queryConfig: StreamQueryConfig)
-  extends CoProcessFunction[CRow, CRow, CRow]
-  with Compiler[FlatJoinFunction[Row, Row, Row]]
-  with Logging {
-
-  // check if input types implement proper equals/hashCode
-  validateEqualsHashCode("join", leftType)
-  validateEqualsHashCode("join", rightType)
-
-  // state to hold left stream element
-  private var leftState: MapState[Row, JTuple2[Int, Long]] = _
-  // state to hold right stream element
-  private var rightState: MapState[Row, JTuple2[Int, Long]] = _
-  private var cRowWrapper: CRowWrappingMultiOutputCollector = _
-
-  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
-  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
-  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
-
-  // state to record last timer of left stream, 0 means no timer
-  private var leftTimer: ValueState[Long] = _
-  // state to record last timer of right stream, 0 means no timer
-  private var rightTimer: ValueState[Long] = _
-
-  // other condition function
-  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+  extends NonWindowJoin(
+    leftType,
+    rightType,
+    resultType,
+    genJoinFuncName,
+    genJoinFuncCode,
+    queryConfig) {
 
   override def open(parameters: Configuration): Unit = {
-    LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
-                s"Code:\n$genJoinFuncCode")
-    val clazz = compile(
-      getRuntimeContext.getUserCodeClassLoader,
-      genJoinFuncName,
-      genJoinFuncCode)
-    LOG.debug("Instantiating JoinFunction.")
-    joinFunction = clazz.newInstance()
-
-    // initialize left and right state, the first element of tuple2 indicates how many rows of
-    // this row, while the second element represents the expired time of this row.
-    val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG)
-    val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]](
-      "left", leftType, tupleTypeInfo)
-    val rightStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]](
-      "right", rightType, tupleTypeInfo)
-    leftState = getRuntimeContext.getMapState(leftStateDescriptor)
-    rightState = getRuntimeContext.getMapState(rightStateDescriptor)
-
-    // initialize timer state
-    val valueStateDescriptor1 = new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
-    leftTimer = getRuntimeContext.getState(valueStateDescriptor1)
-    val valueStateDescriptor2 = new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
-    rightTimer = getRuntimeContext.getState(valueStateDescriptor2)
-
-    cRowWrapper = new CRowWrappingMultiOutputCollector()
-  }
-
-  /**
-    * Process left stream records
-    *
-    * @param valueC The input value.
-    * @param ctx    The ctx to register timer or get current time
-    * @param out    The collector for returning result values.
-    *
-    */
-  override def processElement1(
-      valueC: CRow,
-      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-      out: Collector[CRow]): Unit = {
-
-    processElement(valueC, ctx, out, leftTimer, leftState, rightState, isLeft = true)
-  }
-
-  /**
-    * Process right stream records
-    *
-    * @param valueC The input value.
-    * @param ctx    The ctx to register timer or get current time
-    * @param out    The collector for returning result values.
-    *
-    */
-  override def processElement2(
-      valueC: CRow,
-      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-      out: Collector[CRow]): Unit = {
-
-    processElement(valueC, ctx, out, rightTimer, rightState, leftState, isLeft = false)
-  }
-
-
-  /**
-    * Called when a processing timer trigger.
-    * Expire left/right records which are expired in left and right state.
-    *
-    * @param timestamp The timestamp of the firing timer.
-    * @param ctx       The ctx to register timer or get current time
-    * @param out       The collector for returning result values.
-    */
-  override def onTimer(
-      timestamp: Long,
-      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
-      out: Collector[CRow]): Unit = {
-
-    if (stateCleaningEnabled && leftTimer.value == timestamp) {
-      expireOutTimeRow(
-        timestamp,
-        leftState,
-        leftTimer,
-        ctx
-      )
-    }
-
-    if (stateCleaningEnabled && rightTimer.value == timestamp) {
-      expireOutTimeRow(
-        timestamp,
-        rightState,
-        rightTimer,
-        ctx
-      )
-    }
-  }
-
-  def getNewExpiredTime(
-      curProcessTime: Long,
-      oldExpiredTime: Long): Long = {
-
-    if (stateCleaningEnabled && curProcessTime + minRetentionTime > oldExpiredTime) {
-      curProcessTime + maxRetentionTime
-    } else {
-      oldExpiredTime
-    }
+    super.open(parameters)
+    LOG.debug("Instantiating NonWindowInnerJoin.")
   }
 
   /**
@@ -190,102 +62,33 @@ class NonWindowInnerJoin(
     * output records meet the condition. Records will be expired in state if state retention time
     * has been specified.
     */
-  def processElement(
+  override def processElement(
       value: CRow,
       ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
       out: Collector[CRow],
       timerState: ValueState[Long],
-      currentSideState: MapState[Row, JTuple2[Int, Long]],
-      otherSideState: MapState[Row, JTuple2[Int, Long]],
+      currentSideState: MapState[Row, JTuple2[Long, Long]],
+      otherSideState: MapState[Row, JTuple2[Long, Long]],
       isLeft: Boolean): Unit = {
 
     val inputRow = value.row
+    updateCurrentSide(value, ctx, timerState, currentSideState)
+
     cRowWrapper.setCollector(out)
     cRowWrapper.setChange(value.change)
-
-    val curProcessTime = ctx.timerService.currentProcessingTime
-    val oldCntAndExpiredTime = currentSideState.get(inputRow)
-    val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
-      JTuple2.of(0, -1L)
-    } else {
-      oldCntAndExpiredTime
-    }
-
-    cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1)
-    if (stateCleaningEnabled && timerState.value() == 0) {
-      timerState.update(cntAndExpiredTime.f1)
-      ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1)
-    }
-
-    // update current side stream state
-    if (!value.change) {
-      cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1
-      if (cntAndExpiredTime.f0 <= 0) {
-        currentSideState.remove(inputRow)
-      } else {
-        currentSideState.put(inputRow, cntAndExpiredTime)
-      }
-    } else {
-      cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1
-      currentSideState.put(inputRow, cntAndExpiredTime)
-    }
-
     val otherSideIterator = otherSideState.iterator()
     // join other side data
     while (otherSideIterator.hasNext) {
       val otherSideEntry = otherSideIterator.next()
       val otherSideRow = otherSideEntry.getKey
-      val cntAndExpiredTime = otherSideEntry.getValue
+      val otherSideCntAndExpiredTime = otherSideEntry.getValue
       // join
-      cRowWrapper.setTimes(cntAndExpiredTime.f0)
-      if (isLeft) {
-        joinFunction.join(inputRow, otherSideRow, cRowWrapper)
-      } else {
-        joinFunction.join(otherSideRow, inputRow, cRowWrapper)
-      }
+      cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+      callJoinFunction(inputRow, isLeft, otherSideRow, cRowWrapper)
       // clear expired data. Note: clear after join to keep closer to the original semantics
-      if (stateCleaningEnabled && curProcessTime >= cntAndExpiredTime.f1) {
+      if (stateCleaningEnabled && curProcessTime >= otherSideCntAndExpiredTime.f1) {
         otherSideIterator.remove()
       }
     }
   }
-
-
-  /**
-    * Removes records which are expired from the state. Registers a new timer if the state still
-    * holds records after the clean-up.
-    */
-  private def expireOutTimeRow(
-      curTime: Long,
-      rowMapState: MapState[Row, JTuple2[Int, Long]],
-      timerState: ValueState[Long],
-      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = {
-
-    val rowMapIter = rowMapState.iterator()
-    var validTimestamp: Boolean = false
-
-    while (rowMapIter.hasNext) {
-      val mapEntry = rowMapIter.next()
-      val recordExpiredTime = mapEntry.getValue.f1
-      if (recordExpiredTime <= curTime) {
-        rowMapIter.remove()
-      } else {
-        // we found a timestamp that is still valid
-        validTimestamp = true
-      }
-    }
-
-    // If the state has non-expired timestamps, register a new timer.
-    // Otherwise clean the complete state for this input.
-    if (validTimestamp) {
-      val cleanupTime = curTime + maxRetentionTime
-      ctx.timerService.registerProcessingTimeTimer(cleanupTime)
-      timerState.update(cleanupTime)
-    } else {
-      timerState.clear()
-      rowMapState.clear()
-    }
-  }
-
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
new file mode 100644
index 0000000..51db755
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream-stream non-window Join.
+  *
+  * @param leftType          the input type of left stream
+  * @param rightType         the input type of right stream
+  * @param resultType        the output type of join
+  * @param genJoinFuncName   the function code of other non-equi condition
+  * @param genJoinFuncCode   the function name of other non-equi condition
+  * @param queryConfig       the configuration for the query to generate
+  */
+abstract class NonWindowJoin(
+    leftType: TypeInformation[Row],
+    rightType: TypeInformation[Row],
+    resultType: TypeInformation[CRow],
+    genJoinFuncName: String,
+    genJoinFuncCode: String,
+    queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  // check if input types implement proper equals/hashCode
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  // state to hold left stream element
+  protected var leftState: MapState[Row, JTuple2[Long, Long]] = _
+  // state to hold right stream element
+  protected var rightState: MapState[Row, JTuple2[Long, Long]] = _
+  protected var cRowWrapper: CRowWrappingMultiOutputCollector = _
+
+  protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // state to record last timer of left stream, 0 means no timer
+  protected var leftTimer: ValueState[Long] = _
+  // state to record last timer of right stream, 0 means no timer
+  protected var rightTimer: ValueState[Long] = _
+
+  // other condition function
+  protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // current processing time
+  protected var curProcessTime: Long = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+                s"Code:\n$genJoinFuncCode")
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genJoinFuncName,
+      genJoinFuncCode)
+
+    joinFunction = clazz.newInstance()
+
+    // initialize left and right state, the first element of tuple2 indicates how many rows of
+    // this row, while the second element represents the expired time of this row.
+    val tupleTypeInfo = new TupleTypeInfo[JTuple2[Long, Long]](Types.LONG, Types.LONG)
+    val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Long, Long]](
+      "left", leftType, tupleTypeInfo)
+    val rightStateDescriptor = new MapStateDescriptor[Row, JTuple2[Long, Long]](
+      "right", rightType, tupleTypeInfo)
+    leftState = getRuntimeContext.getMapState(leftStateDescriptor)
+    rightState = getRuntimeContext.getMapState(rightStateDescriptor)
+
+    // initialize timer state
+    val valueStateDescriptor1 = new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+    leftTimer = getRuntimeContext.getState(valueStateDescriptor1)
+    val valueStateDescriptor2 = new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
+    rightTimer = getRuntimeContext.getState(valueStateDescriptor2)
+
+    cRowWrapper = new CRowWrappingMultiOutputCollector()
+    LOG.debug("Instantiating NonWindowJoin.")
+  }
+
+  /**
+    * Process left stream records
+    *
+    * @param valueC The input value.
+    * @param ctx    The ctx to register timer or get current time
+    * @param out    The collector for returning result values.
+    *
+    */
+  override def processElement1(
+      valueC: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    processElement(valueC, ctx, out, leftTimer, leftState, rightState, isLeft = true)
+  }
+
+  /**
+    * Process right stream records
+    *
+    * @param valueC The input value.
+    * @param ctx    The ctx to register timer or get current time
+    * @param out    The collector for returning result values.
+    *
+    */
+  override def processElement2(
+      valueC: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    processElement(valueC, ctx, out, rightTimer, rightState, leftState, isLeft = false)
+  }
+
+  /**
+    * Called when a processing timer trigger.
+    * Expire left/right records which are expired in left and right state.
+    *
+    * @param timestamp The timestamp of the firing timer.
+    * @param ctx       The ctx to register timer or get current time
+    * @param out       The collector for returning result values.
+    */
+  override def onTimer(
+      timestamp: Long,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
+      out: Collector[CRow]): Unit = {
+
+    if (stateCleaningEnabled && leftTimer.value == timestamp) {
+      expireOutTimeRow(
+        timestamp,
+        leftState,
+        leftTimer,
+        isLeft = true,
+        ctx
+      )
+    }
+
+    if (stateCleaningEnabled && rightTimer.value == timestamp) {
+      expireOutTimeRow(
+        timestamp,
+        rightState,
+        rightTimer,
+        isLeft = false,
+        ctx
+      )
+    }
+  }
+
+  def getNewExpiredTime(curProcessTime: Long, oldExpiredTime: Long): Long = {
+    if (stateCleaningEnabled && curProcessTime + minRetentionTime > oldExpiredTime) {
+      curProcessTime + maxRetentionTime
+    } else {
+      oldExpiredTime
+    }
+  }
+
+  /**
+    * Removes records which are expired from the state. Register a new timer if the state still
+    * holds records after the clean-up.
+    */
+  def expireOutTimeRow(
+      curTime: Long,
+      rowMapState: MapState[Row, JTuple2[Long, Long]],
+      timerState: ValueState[Long],
+      isLeft: Boolean,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = {
+
+    val rowMapIter = rowMapState.iterator()
+    var validTimestamp: Boolean = false
+
+    while (rowMapIter.hasNext) {
+      val mapEntry = rowMapIter.next()
+      val recordExpiredTime = mapEntry.getValue.f1
+      if (recordExpiredTime <= curTime) {
+        rowMapIter.remove()
+      } else {
+        // we found a timestamp that is still valid
+        validTimestamp = true
+      }
+    }
+
+    // If the state has non-expired timestamps, register a new timer.
+    // Otherwise clean the complete state for this input.
+    if (validTimestamp) {
+      val cleanupTime = curTime + maxRetentionTime
+      ctx.timerService.registerProcessingTimeTimer(cleanupTime)
+      timerState.update(cleanupTime)
+    } else {
+      timerState.clear()
+      rowMapState.clear()
+    }
+  }
+
+  /**
+    * Puts or Retract an element from the input stream into state and search the other state to
+    * output records meet the condition. Records will be expired in state if state retention time
+    * has been specified.
+    */
+  def processElement(
+      value: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow],
+      timerState: ValueState[Long],
+      currentSideState: MapState[Row, JTuple2[Long, Long]],
+      otherSideState: MapState[Row, JTuple2[Long, Long]],
+      isLeft: Boolean): Unit
+
+  /**
+    * Update current side state. Put row and it's number and expired time into row state. Also,
+    * register a timer if state retention time has been specified.
+    *
+    * @param value            The input CRow
+    * @param ctx              The ctx to register timer or get current time
+    * @param timerState       The state to record last timer
+    * @param currentSideState The state to hold current side stream element
+    * @return The row number and expired time for current input row
+    */
+  def updateCurrentSide(
+      value: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      timerState: ValueState[Long],
+      currentSideState: MapState[Row, JTuple2[Long, Long]]): JTuple2[Long, Long] = {
+
+    val inputRow = value.row
+    curProcessTime = ctx.timerService.currentProcessingTime
+    val oldCntAndExpiredTime = currentSideState.get(inputRow)
+    val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
+      JTuple2.of(0L, -1L)
+    } else {
+      oldCntAndExpiredTime
+    }
+
+    cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1)
+    // update timer if necessary
+    if (stateCleaningEnabled && timerState.value() == 0) {
+      timerState.update(cntAndExpiredTime.f1)
+      ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1)
+    }
+
+    // update current side stream state
+    if (!value.change) {
+      cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1
+      if (cntAndExpiredTime.f0 <= 0) {
+        currentSideState.remove(inputRow)
+      } else {
+        currentSideState.put(inputRow, cntAndExpiredTime)
+      }
+    } else {
+      cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1
+      currentSideState.put(inputRow, cntAndExpiredTime)
+    }
+
+    cntAndExpiredTime
+  }
+
+  def callJoinFunction(
+      inputRow: Row,
+      inputRowFromLeft: Boolean,
+      otherSideRow: Row,
+      cRowWrapper: Collector[Row]): Unit = {
+
+    if (inputRowFromLeft) {
+      joinFunction.join(inputRow, otherSideRow, cRowWrapper)
+    } else {
+      joinFunction.join(otherSideRow, inputRow, cRowWrapper)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala
new file mode 100644
index 0000000..a595712
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for left or right join without
+  * non-equal predicates.
+  *
+  * @param leftType        the input type of left stream
+  * @param rightType       the input type of right stream
+  * @param resultType      the output type of join
+  * @param genJoinFuncName the function code without any non-equi condition
+  * @param genJoinFuncCode the function name without any non-equi condition
+  * @param isLeftJoin      the type of join, whether it is the type of left join
+  * @param queryConfig     the configuration for the query to generate
+  */
+class NonWindowLeftRightJoin(
+    leftType: TypeInformation[Row],
+    rightType: TypeInformation[Row],
+    resultType: TypeInformation[CRow],
+    genJoinFuncName: String,
+    genJoinFuncCode: String,
+    isLeftJoin: Boolean,
+    queryConfig: StreamQueryConfig)
+  extends NonWindowOuterJoin(
+    leftType,
+    rightType,
+    resultType,
+    genJoinFuncName,
+    genJoinFuncCode,
+    isLeftJoin,
+    queryConfig) {
+
+  override def open(parameters: Configuration): Unit = {
+    super.open(parameters)
+    val joinType = if (isLeftJoin) "Left" else "Right"
+    LOG.debug(s"Instantiating NonWindow${joinType}OuterJoin")
+  }
+
+  /**
+    * Puts or Retract an element from the input stream into state and search the other state to
+    * output records meet the condition. The input row will be preserved and appended with null, if
+    * there is no match. Records will be expired in state if state retention time has been
+    * specified.
+    */
+  override def processElement(
+      value: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow],
+      timerState: ValueState[Long],
+      currentSideState: MapState[Row, JTuple2[Long, Long]],
+      otherSideState: MapState[Row, JTuple2[Long, Long]],
+      recordFromLeft: Boolean): Unit = {
+
+    val inputRow = value.row
+    updateCurrentSide(value, ctx, timerState, currentSideState)
+
+    cRowWrapper.reset()
+    cRowWrapper.setCollector(out)
+    cRowWrapper.setChange(value.change)
+
+    // join other side data
+    if (recordFromLeft == isLeftJoin) {
+      preservedJoin(inputRow, recordFromLeft, otherSideState)
+    } else {
+      retractJoin(value, recordFromLeft, currentSideState, otherSideState)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala
new file mode 100644
index 0000000..f3a499a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for left or right join with non-equal
+  * predicates. An MapState of type [Row, Long] is used to record how many matched rows for the
+  * specified row. Left and right join without non-equal predicates doesn't need it because rows
+  * from one side can always join rows from the other side as long as join keys are same.
+  *
+  * @param leftType        the input type of left stream
+  * @param rightType       the input type of right stream
+  * @param resultType      the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin      the type of join, whether it is the type of left join
+  * @param queryConfig     the configuration for the query to generate
+  */
+class NonWindowLeftRightJoinWithNonEquiPredicates(
+    leftType: TypeInformation[Row],
+    rightType: TypeInformation[Row],
+    resultType: TypeInformation[CRow],
+    genJoinFuncName: String,
+    genJoinFuncCode: String,
+    isLeftJoin: Boolean,
+    queryConfig: StreamQueryConfig)
+  extends NonWindowOuterJoinWithNonEquiPredicates(
+    leftType,
+    rightType,
+    resultType,
+    genJoinFuncName,
+    genJoinFuncCode,
+    isLeftJoin,
+    queryConfig) {
+
+  override def open(parameters: Configuration): Unit = {
+    super.open(parameters)
+    val joinType = if (isLeftJoin) "Left" else "Right"
+    LOG.debug(s"Instantiating NonWindow${joinType}JoinWithNonEquiPredicates.")
+  }
+
+  /**
+    * Puts or Retract an element from the input stream into state and search the other state to
+    * output records meet the condition. The result is NULL from the right side, if there is no
+    * match. Records will be expired in state if state retention time has been specified.
+    */
+  override def processElement(
+      value: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow],
+      timerState: ValueState[Long],
+      currentSideState: MapState[Row, JTuple2[Long, Long]],
+      otherSideState: MapState[Row, JTuple2[Long, Long]],
+      recordFromLeft: Boolean): Unit = {
+
+    val currentJoinCntState = getJoinCntState(joinCntState, recordFromLeft)
+    val inputRow = value.row
+    val cntAndExpiredTime = updateCurrentSide(value, ctx, timerState, currentSideState)
+    if (!value.change && cntAndExpiredTime.f0 <= 0 && recordFromLeft == isLeftJoin) {
+      currentJoinCntState.remove(inputRow)
+    }
+
+    cRowWrapper.reset()
+    cRowWrapper.setCollector(out)
+    cRowWrapper.setChange(value.change)
+    // join other side data
+    if (recordFromLeft == isLeftJoin) {
+      val joinCnt = preservedJoin(inputRow, recordFromLeft, otherSideState)
+      // init matched cnt only when row cnt is changed from 0 to 1. Each time encountered a
+      // new record from the other side, joinCnt will also be updated.
+      if (cntAndExpiredTime.f0 == 1 && value.change) {
+        currentJoinCntState.put(inputRow, joinCnt)
+      }
+    } else {
+      val otherSideJoinCntState = getJoinCntState(joinCntState, !recordFromLeft)
+      retractJoinWithNonEquiPreds(value, recordFromLeft, otherSideState, otherSideJoinCntState)
+    }
+  }
+
+  /**
+    * Removes records which are expired from state. Register a new timer if the state still
+    * holds records after the clean-up. Also, clear joinCnt map state when clear rowMapState.
+    */
+  override def expireOutTimeRow(
+      curTime: Long,
+      rowMapState: MapState[Row, JTuple2[Long, Long]],
+      timerState: ValueState[Long],
+      isLeft: Boolean,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = {
+
+    expireOutTimeRow(curTime, rowMapState, timerState, isLeft, joinCntState, ctx)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
new file mode 100644
index 0000000..73b4b5b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream non-window outer Join.
+  *
+  * @param leftType        the input type of left stream
+  * @param rightType       the input type of right stream
+  * @param resultType      the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin      the type of join, whether it is the type of left join
+  * @param queryConfig     the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+    leftType: TypeInformation[Row],
+    rightType: TypeInformation[Row],
+    resultType: TypeInformation[CRow],
+    genJoinFuncName: String,
+    genJoinFuncCode: String,
+    isLeftJoin: Boolean,
+    queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+    leftType,
+    rightType,
+    resultType,
+    genJoinFuncName,
+    genJoinFuncCode,
+    queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when there is no matched rows.
+  protected var rightResultRow: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+    super.open(parameters)
+    leftResultRow = new Row(resultType.getArity)
+    rightResultRow = new Row(resultType.getArity)
+    LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+    * Join current row with other side rows. Preserve current row if there are no matched rows
+    * from other side. The RowWrapper has been reset before we call preservedJoin and we also
+    * assume that the current change of cRowWrapper is equal to value.change.
+    *
+    * @param inputRow         the input row
+    * @param inputRowFromLeft the flag indicat whether input row is from left
+    * @param otherSideState   the other side state
+    * @return the number of matched rows
+    */
+  def preservedJoin(
+      inputRow: Row,
+      inputRowFromLeft: Boolean,
+      otherSideState: MapState[Row, JTuple2[Long, Long]]): Long = {
+
+    val otherSideIterator = otherSideState.iterator()
+    while (otherSideIterator.hasNext) {
+      val otherSideEntry = otherSideIterator.next()
+      val otherSideRow = otherSideEntry.getKey
+      val otherSideCntAndExpiredTime = otherSideEntry.getValue
+      // join
+      cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+      callJoinFunction(inputRow, inputRowFromLeft, otherSideRow, cRowWrapper)
+      // clear expired data. Note: clear after join to keep closer to the original semantics
+      if (stateCleaningEnabled && curProcessTime >= otherSideCntAndExpiredTime.f1) {
+        otherSideIterator.remove()
+      }
+    }
+    val joinCnt = cRowWrapper.getEmitCnt
+    // The result is NULL from the other side, if there is no match.
+    if (joinCnt == 0) {
+      cRowWrapper.setTimes(1)
+      collectAppendNull(inputRow, inputRowFromLeft, cRowWrapper)
+    }
+    joinCnt
+  }
+
+  /**
+    * Join current row with other side rows. Retract previous output row if matched condition
+    * changed, i.e, matched condition is changed from matched to unmatched or vice versa. The
+    * RowWrapper has been reset before we call retractJoin and we also assume that the current
+    * change of cRowWrapper is equal to value.change.
+    */
+  def retractJoin(
+      value: CRow,
+      inputRowFromLeft: Boolean,
+      currentSideState: MapState[Row, JTuple2[Long, Long]],
+      otherSideState: MapState[Row, JTuple2[Long, Long]]): Unit = {
+
+    val inputRow = value.row
+    val otherSideIterator = otherSideState.iterator()
+    // approximate number of record in current side. We only check whether number equals to 0, 1
+    // or bigger
+    val recordNum: Long = approxiRecordNumInState(currentSideState)
+
+    while (otherSideIterator.hasNext) {
+      val otherSideEntry = otherSideIterator.next()
+      val otherSideRow = otherSideEntry.getKey
+      val otherSideCntAndExpiredTime = otherSideEntry.getValue
+      cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+
+      // retract previous preserved record append with null
+      if (recordNum == 1 && value.change) {
+        cRowWrapper.setChange(false)
+        collectAppendNull(otherSideRow, !inputRowFromLeft, cRowWrapper)
+        cRowWrapper.setChange(true)
+      }
+      // do normal join
+      callJoinFunction(inputRow, inputRowFromLeft, otherSideRow, cRowWrapper)
+
+      // output preserved record append with null if have to
+      if (!value.change && recordNum == 0) {
+        cRowWrapper.setChange(true)
+        collectAppendNull(otherSideRow, !inputRowFromLeft, cRowWrapper)
+      }
+      // clear expired data. Note: clear after join to keep closer to the original semantics
+      if (stateCleaningEnabled && curProcessTime >= otherSideCntAndExpiredTime.f1) {
+        otherSideIterator.remove()
+      }
+    }
+  }
+
+  /**
+    * Return approximate number of records in corresponding state. Only check if record number is
+    * 0, 1 or bigger.
+    */
+  def approxiRecordNumInState(currentSideState: MapState[Row, JTuple2[Long, Long]]): Long = {
+    var recordNum = 0L
+    val it = currentSideState.iterator()
+    while(it.hasNext && recordNum < 2) {
+      recordNum += it.next().getValue.f0
+    }
+    recordNum
+  }
+
+  /**
+    * Append input row with default null value if there is no match and Collect.
+    */
+  def collectAppendNull(
+      inputRow: Row,
+      inputFromLeft: Boolean,
+      out: Collector[Row]): Unit = {
+
+    var i = 0
+    if (inputFromLeft) {
+      while (i < inputRow.getArity) {
+        leftResultRow.setField(i, inputRow.getField(i))
+        i += 1
+      }
+      out.collect(leftResultRow)
+    } else {
+      while (i < inputRow.getArity) {
+        val idx = rightResultRow.getArity - inputRow.getArity + i
+        rightResultRow.setField(idx, inputRow.getField(i))
+        i += 1
+      }
+      out.collect(rightResultRow)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala
new file mode 100644
index 0000000..315fa0d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * Connect data for left stream and right stream. Base class for stream non-window outer Join
+  * with non-equal predicates.
+  *
+  * @param leftType        the input type of left stream
+  * @param rightType       the input type of right stream
+  * @param resultType      the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin      the type of join, whether it is the type of left join
+  * @param queryConfig     the configuration for the query to generate
+  */
+  abstract class NonWindowOuterJoinWithNonEquiPredicates(
+    leftType: TypeInformation[Row],
+    rightType: TypeInformation[Row],
+    resultType: TypeInformation[CRow],
+    genJoinFuncName: String,
+    genJoinFuncCode: String,
+    isLeftJoin: Boolean,
+    queryConfig: StreamQueryConfig)
+  extends NonWindowOuterJoin(
+    leftType,
+    rightType,
+    resultType,
+    genJoinFuncName,
+    genJoinFuncCode,
+    isLeftJoin,
+    queryConfig) {
+
+  // how many matched rows from the right table for each left row. Index 0 is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  protected var countingCollector: CountingCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    super.open(parameters)
+
+    leftResultRow = new Row(resultType.getArity)
+    rightResultRow = new Row(resultType.getArity)
+
+    joinCntState = new Array[MapState[Row, Long]](2)
+    val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+      "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]])
+    joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+    val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+      "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]])
+    joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+    countingCollector = new CountingCollector()
+    LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+    * Join current row with other side rows when contains non-equal predicates. Retract previous
+    * output row if matched condition changed, i.e, matched condition is changed from matched to
+    * unmatched or vice versa. The RowWrapper has been reset before we call retractJoin and we
+    * also assume that the current change of cRowWrapper is equal to value.change.
+    */
+  def retractJoinWithNonEquiPreds(
+      value: CRow,
+      inputRowFromLeft: Boolean,
+      otherSideState: MapState[Row, JTuple2[Long, Long]],
+      otherSideJoinCntState: MapState[Row, Long]): Unit = {
+
+    val inputRow = value.row
+    val otherSideIterator = otherSideState.iterator()
+    while (otherSideIterator.hasNext) {
+      val otherSideEntry = otherSideIterator.next()
+      val otherSideRow = otherSideEntry.getKey
+      val otherSideCntAndExpiredTime = otherSideEntry.getValue
+
+      countingCollector.reset()
+      callJoinFunction(inputRow, inputRowFromLeft, otherSideRow, countingCollector)
+      if (countingCollector.getEmitCnt > 0) {
+        cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+        val joinCnt = otherSideJoinCntState.get(otherSideRow)
+        if (value.change) {
+          otherSideJoinCntState.put(otherSideRow, joinCnt + 1L)
+          if (joinCnt == 0) {
+            // retract previous non matched result row
+            cRowWrapper.setChange(false)
+            collectAppendNull(otherSideRow, !inputRowFromLeft, cRowWrapper)
+            cRowWrapper.setChange(true)
+          }
+          // do normal join
+          callJoinFunction(inputRow, inputRowFromLeft, otherSideRow, cRowWrapper)
+        } else {
+          otherSideJoinCntState.put(otherSideRow, joinCnt - 1L)
+          // do normal join
+          callJoinFunction(inputRow, inputRowFromLeft, otherSideRow, cRowWrapper)
+          if (joinCnt == 1) {
+            // output non matched result row
+            cRowWrapper.setChange(true)
+            collectAppendNull(otherSideRow, !inputRowFromLeft, cRowWrapper)
+          }
+        }
+      }
+      if (stateCleaningEnabled && curProcessTime >= otherSideCntAndExpiredTime.f1) {
+        otherSideIterator.remove()
+      }
+    }
+  }
+
+  /**
+    * Removes records which are expired from state. Registers a new timer if the state still
+    * holds records after the clean-up. Also, clear joinCnt map state when clear rowMapState.
+    */
+  def expireOutTimeRow(
+      curTime: Long,
+      rowMapState: MapState[Row, JTuple2[Long, Long]],
+      timerState: ValueState[Long],
+      isLeft: Boolean,
+      joinCntState: Array[MapState[Row, Long]],
+      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = {
+
+    val currentJoinCntState = getJoinCntState(joinCntState, isLeft)
+    val rowMapIter = rowMapState.iterator()
+    var validTimestamp: Boolean = false
+
+    while (rowMapIter.hasNext) {
+      val mapEntry = rowMapIter.next()
+      val recordExpiredTime = mapEntry.getValue.f1
+      if (recordExpiredTime <= curTime) {
+        rowMapIter.remove()
+        currentJoinCntState.remove(mapEntry.getKey)
+      } else {
+        // we found a timestamp that is still valid
+        validTimestamp = true
+      }
+    }
+    // If the state has non-expired timestamps, register a new timer.
+    // Otherwise clean the complete state for this input.
+    if (validTimestamp) {
+      val cleanupTime = curTime + maxRetentionTime
+      ctx.timerService.registerProcessingTimeTimer(cleanupTime)
+      timerState.update(cleanupTime)
+    } else {
+      timerState.clear()
+      rowMapState.clear()
+      if (isLeft == isLeftJoin) {
+        currentJoinCntState.clear()
+      }
+    }
+  }
+
+  /**
+    * Get left or right join cnt state.
+    *
+    * @param joinCntState    the join cnt state array, index 0 is left join cnt state, index 1
+    *                        is right
+    * @param isLeftCntState the flag whether get the left join cnt state
+    * @return the corresponding join cnt state
+    */
+  def getJoinCntState(
+      joinCntState: Array[MapState[Row, Long]],
+      isLeftCntState: Boolean)
+    : MapState[Row, Long] = {
+
+    if (isLeftCntState) {
+      joinCntState(0)
+    } else {
+      joinCntState(1)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
index 1c36169..3f9c401 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
@@ -62,4 +62,20 @@ class TableSinkValidationTest extends TableTestBase {
     // must fail because table is updating table without full key
     env.execute()
   }
+
+  @Test(expected = classOf[TableException])
+  def testAppendSinkOnLeftJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h)
+      .select('c, 'g)
+      .writeToSink(new TestAppendSink)
+
+    // must fail because table is not append-only
+    env.execute()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
index ff3fdf9..6a8b686 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api.Table
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.nodes.datastream._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.CountDistinct
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
 import org.junit.Assert._
@@ -306,8 +307,87 @@ class RetractionRulesTest extends TableTestBase {
       )
     util.verifyTableTrait(resultTable, expected)
   }
-}
 
+  @Test
+  def testInnerJoinWithoutAgg(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val lTable = util.addTable[(Int, Int)]('a, 'b)
+    val rTable = util.addTable[(Int, Int)]('bb, 'c)
+
+    val resultTable = lTable
+      .join(rTable)
+      .where('b === 'bb)
+      .select('a, 'b, 'c)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamJoin",
+          "DataStreamScan(true, Acc)",
+          "DataStreamScan(true, Acc)",
+          "false, Acc"
+        ),
+        "false, Acc"
+      )
+    util.verifyTableTrait(resultTable, expected)
+  }
+
+  @Test
+  def testLeftJoin(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val lTable = util.addTable[(Int, Int)]('a, 'b)
+    val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+    val resultTable = lTable
+      .leftOuterJoin(rTable, 'b === 'bb)
+      .select('a, 'b, 'c)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamJoin",
+          "DataStreamScan(true, Acc)",
+          "DataStreamScan(true, Acc)",
+          "false, AccRetract"
+        ),
+        "false, AccRetract"
+      )
+    util.verifyTableTrait(resultTable, expected)
+  }
+
+  @Test
+  def testAggFollowedWithLeftJoin(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val lTable = util.addTable[(Int, Int)]('a, 'b)
+    val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+    val countDistinct = new CountDistinct
+    val resultTable = lTable
+      .leftOuterJoin(rTable, 'b === 'bb)
+      .select('a, 'b, 'c)
+      .groupBy('a)
+      .select('a, countDistinct('c))
+
+    val expected =
+      unaryNode(
+        "DataStreamGroupAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          binaryNode(
+            "DataStreamJoin",
+            "DataStreamScan(true, Acc)",
+            "DataStreamScan(true, Acc)",
+            "true, AccRetract"
+          ),
+          "true, AccRetract"
+        ),
+        "false, Acc"
+      )
+    util.verifyTableTrait(resultTable, expected)
+  }
+}
 
 class StreamTableTestForRetractionUtil extends StreamTableTestUtil {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
index aef6443..e0cbba5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
@@ -123,6 +123,27 @@ class UpdatingPlanCheckerTest {
   }
 
   @Test
+  def testForwardBothKeysForLeftJoin1(): Unit = {
+    val util = new UpdatePlanCheckerUtil()
+    val table = util.addTable[(Int, Int)]('pk, 'a)
+
+    val lTableWithPk = table
+      .groupBy('pk)
+      .select('pk as 'l1, 'pk as 'l2, 'pk as 'l3, 'a.max as 'l4, 'a.min as 'l5)
+
+    val rTableWithPk = table
+      .groupBy('pk)
+      .select('pk as 'r2, 'pk as 'r3, 'a.max as 'r1, 'a.min as 'r4, 'a.min as 'r5)
+
+    val resultTable = lTableWithPk
+      .leftOuterJoin(rTableWithPk)
+      .where('l2 === 'r2 && 'l4 === 'r3 && 'l4 === 'r5 && 'l5 === 'r4)
+      .select('l1, 'l2, 'l3, 'l4, 'l5, 'r1, 'r2, 'r3, 'r4, 'r5)
+
+    util.verifyTableUniqueKey(resultTable, Seq("l1", "l2", "l3", "l4", "r2", "r3", "r5"))
+  }
+
+  @Test
   def testForwardBothKeysForJoin2(): Unit = {
     val util = new UpdatePlanCheckerUtil()
     val table = util.addTable[(Int, Int)]('pk, 'a)
@@ -144,6 +165,27 @@ class UpdatingPlanCheckerTest {
   }
 
   @Test
+  def testForwardBothKeysForLeftJoin2(): Unit = {
+    val util = new UpdatePlanCheckerUtil()
+    val table = util.addTable[(Int, Int)]('pk, 'a)
+
+    val lTableWithPk = table
+      .groupBy('pk)
+      .select('pk as 'l1, 'pk as 'l2, 'pk as 'l3, 'a.max as 'l4, 'a.min as 'l5)
+
+    val rTableWithPk = table
+      .groupBy('pk)
+      .select('pk as 'r2, 'pk as 'r3, 'a.max as 'r1, 'a.min as 'r4, 'a.count as 'r5)
+
+    val resultTable = lTableWithPk
+      .leftOuterJoin(rTableWithPk)
+      .where('l5 === 'r4)
+      .select('l1, 'l2, 'l3, 'l4, 'l5, 'r1, 'r2, 'r3, 'r4, 'r5)
+
+    util.verifyTableUniqueKey(resultTable, Seq("l1", "l2", "l3", "r2", "r3"))
+  }
+
+  @Test
   def testJoinKeysEqualsLeftKeys(): Unit = {
     val util = new UpdatePlanCheckerUtil()
     val table = util.addTable[(Int, Int)]('pk, 'a)
@@ -165,6 +207,27 @@ class UpdatingPlanCheckerTest {
   }
 
   @Test
+  def testJoinKeysEqualsLeftKeysForLeftJoin(): Unit = {
+    val util = new UpdatePlanCheckerUtil()
+    val table = util.addTable[(Int, Int)]('pk, 'a)
+
+    val lTableWithPk = table
+      .groupBy('pk)
+      .select('pk as 'leftpk, 'a.max as 'lefta)
+
+    val rTableWithPk = table
+      .groupBy('pk)
+      .select('pk as 'rightpk, 'a.max as 'righta)
+
+    val resultTable = lTableWithPk
+      .leftOuterJoin(rTableWithPk)
+      .where('leftpk === 'righta)
+      .select('rightpk, 'lefta, 'righta)
+
+    util.verifyTableUniqueKey(resultTable, Seq("rightpk", "righta"))
+  }
+
+  @Test
   def testJoinKeysEqualsRightKeys(): Unit = {
     val util = new UpdatePlanCheckerUtil()
     val table = util.addTable[(Int, Int)]('pk, 'a)
@@ -185,6 +248,27 @@ class UpdatingPlanCheckerTest {
     util.verifyTableUniqueKey(resultTable, Seq("leftpk", "lefta"))
   }
 
+  @Test
+  def testJoinKeysEqualsRightKeysForLeftJoin(): Unit = {
+    val util = new UpdatePlanCheckerUtil()
+    val table = util.addTable[(Int, Int)]('pk, 'a)
+
+    val lTableWithPk = table
+      .groupBy('pk)
+      .select('pk as 'leftpk, 'a.max as 'lefta)
+
+    val rTableWithPk = table
+      .groupBy('pk)
+      .select('pk as 'rightpk, 'a.max as 'righta)
+
+    val resultTable = lTableWithPk
+      .join(rTableWithPk)
+      .where('lefta === 'rightpk)
+      .select('leftpk, 'lefta, 'righta)
+
+    util.verifyTableUniqueKey(resultTable, Seq("leftpk", "lefta"))
+  }
+
 
   @Test
   def testNonKeysJoin(): Unit = {
@@ -204,6 +288,24 @@ class UpdatingPlanCheckerTest {
 
     util.verifyTableUniqueKey(resultTable, Nil)
   }
+
+  @Test
+  def testNonKeysJoin2(): Unit = {
+    val util = new UpdatePlanCheckerUtil()
+    val table = util.addTable[(Int, Int)]('a, 'b)
+
+    val lTable = table
+      .select('a as 'a, 'b as 'b)
+
+    val rTable = table
+      .select('a as 'aa, 'b as 'bb)
+
+    val resultTable = lTable
+      .leftOuterJoin(rTable, 'a === 'aa)
+      .select('a, 'aa, 'b, 'bb)
+
+    util.verifyTableUniqueKey(resultTable, Nil)
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
index 14c9859..644478b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
@@ -39,7 +39,7 @@ class JoinITCase(
   extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
-  def testJoin(): Unit = {
+  def testInnerJoin(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
@@ -57,7 +57,7 @@ class JoinITCase(
   }
 
   @Test
-  def testJoinWithFilter(): Unit = {
+  def testInnerJoinWithFilter(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -97,7 +97,7 @@ class JoinITCase(
   }
 
   @Test
-  def testJoinWithMultipleKeys(): Unit = {
+  def testInnerJoinWithMultipleKeys(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -118,7 +118,7 @@ class JoinITCase(
   }
 
   @Test
-  def testJoinWithAlias(): Unit = {
+  def testInnerJoinWithAlias(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -140,7 +140,7 @@ class JoinITCase(
   }
 
   @Test
-  def testDataSetJoinWithAggregation(): Unit = {
+  def testInnerJoinWithAggregation(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -160,7 +160,7 @@ class JoinITCase(
   }
 
   @Test
-  def testTableJoinWithAggregation(): Unit = {
+  def testInnerJoinWithAggregation2(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)