You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/01/09 12:37:06 UTC

[2/3] flink git commit: [FLINK-7797] [table] Add support for windowed outer joins for streaming tables.

[FLINK-7797] [table] Add support for windowed outer joins for streaming tables.

This closes #5140.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/222e6945
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/222e6945
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/222e6945

Branch: refs/heads/master
Commit: 222e6945b795cf45ee5aa5228fb207e980854046
Parents: e6fbfdc
Author: Xingcan Cui <xi...@gmail.com>
Authored: Fri Dec 8 01:28:40 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Jan 9 12:05:05 2018 +0100

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |   2 -
 docs/dev/table/tableApi.md                      |   4 -
 .../nodes/datastream/DataStreamWindowJoin.scala | 170 +++---
 .../table/runtime/join/EmitAwareCollector.scala |  54 ++
 .../runtime/join/OuterJoinPaddingUtil.scala     |  71 +++
 .../join/ProcTimeBoundedStreamInnerJoin.scala   |  67 ---
 .../join/ProcTimeBoundedStreamJoin.scala        |  70 +++
 .../join/RowTimeBoundedStreamInnerJoin.scala    |  82 ---
 .../runtime/join/RowTimeBoundedStreamJoin.scala |  85 +++
 .../join/TimeBoundedStreamInnerJoin.scala       | 424 ---------------
 .../runtime/join/TimeBoundedStreamJoin.scala    | 535 +++++++++++++++++++
 .../flink/table/api/stream/sql/JoinTest.scala   | 275 ++++++++++
 .../flink/table/api/stream/table/JoinTest.scala | 326 +++++++++--
 .../table/runtime/harness/JoinHarnessTest.scala | 526 +++++++++++++++---
 .../table/runtime/stream/sql/JoinITCase.scala   | 507 +++++++++++++++++-
 15 files changed, 2404 insertions(+), 794 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 04d6e84..d9a4ce7 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -408,8 +408,6 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
           <li><code>ltime &gt;= rtime AND ltime &lt; rtime + INTERVAL '10' MINUTE</code></li>
           <li><code>ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND</code></li>
         </ul>
-                
-        <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
 
 {% highlight sql %}
 SELECT *

http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 2b58a62..3b31325 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -535,8 +535,6 @@ Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
           <li><code>ltime &gt;= rtime &amp;&amp; ltime &lt; rtime + 10.minutes</code></li>
         </ul>
         
-        <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
-
 {% highlight java %}
 Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
 Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");
@@ -652,8 +650,6 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
           <li><code>'ltime === 'rtime</code></li>
           <li><code>'ltime &gt;= 'rtime &amp;&amp; 'ltime &lt; 'rtime + 10.minutes</code></li>
         </ul>
-        
-        <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
 
 {% highlight scala %}
 val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime)

http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
index 27f2c74..94402672 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -23,17 +23,21 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.functions.{FlatMapFunction, MapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
 import org.apache.flink.table.plan.nodes.CommonJoin
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.plan.util.UpdatingPlanChecker
-import org.apache.flink.table.runtime.CRowKeySelector
-import org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamInnerJoin, RowTimeBoundedStreamInnerJoin, WindowJoinUtil}
+import org.apache.flink.table.runtime.{CRowKeySelector, CRowWrappingCollector}
+import org.apache.flink.table.runtime.join.{OuterJoinPaddingUtil, ProcTimeBoundedStreamJoin, RowTimeBoundedStreamJoin, WindowJoinUtil}
 import org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.util.Logging
@@ -142,71 +146,98 @@ class DataStreamWindowJoin(
         s"${joinConditionToString(schema.relDataType, joinCondition, getExpressionString)}), " +
         s"join: (${joinSelectionToString(schema.relDataType)})"
 
-    joinType match {
-      case JoinRelType.INNER =>
-        if (relativeWindowSize < 0) {
-          LOG.warn(s"The relative window size $relativeWindowSize is negative," +
-            " please check the join conditions.")
-          createEmptyInnerJoin(leftDataStream, rightDataStream, returnTypeInfo)
-        } else {
-          if (isRowTime) {
-            createRowTimeInnerJoin(
-              leftDataStream,
-              rightDataStream,
-              returnTypeInfo,
-              joinOpName,
-              joinFunction.name,
-              joinFunction.code,
-              leftKeys,
-              rightKeys
-            )
-          } else {
-            createProcTimeInnerJoin(
-              leftDataStream,
-              rightDataStream,
-              returnTypeInfo,
-              joinOpName,
-              joinFunction.name,
-              joinFunction.code,
-              leftKeys,
-              rightKeys
-            )
-          }
-        }
-      case JoinRelType.FULL =>
-        throw new TableException(
-          "Full join between stream and stream is not supported yet.")
-      case JoinRelType.LEFT =>
-        throw new TableException(
-          "Left join between stream and stream is not supported yet.")
-      case JoinRelType.RIGHT =>
-        throw new TableException(
-          "Right join between stream and stream is not supported yet.")
+    val flinkJoinType = joinType match {
+      case JoinRelType.INNER => JoinType.INNER
+      case JoinRelType.FULL => JoinType.FULL_OUTER
+      case JoinRelType.LEFT => JoinType.LEFT_OUTER
+      case JoinRelType.RIGHT => JoinType.RIGHT_OUTER
+    }
+
+    if (relativeWindowSize < 0) {
+      LOG.warn(s"The relative window size $relativeWindowSize is negative," +
+        " please check the join conditions.")
+      createNegativeWindowSizeJoin(
+        flinkJoinType,
+        leftDataStream,
+        rightDataStream,
+        leftSchema.arity,
+        rightSchema.arity,
+        returnTypeInfo)
+    } else {
+      if (isRowTime) {
+        createRowTimeJoin(
+          flinkJoinType,
+          leftDataStream,
+          rightDataStream,
+          returnTypeInfo,
+          joinOpName,
+          joinFunction.name,
+          joinFunction.code,
+          leftKeys,
+          rightKeys
+        )
+      } else {
+        createProcTimeJoin(
+          flinkJoinType,
+          leftDataStream,
+          rightDataStream,
+          returnTypeInfo,
+          joinOpName,
+          joinFunction.name,
+          joinFunction.code,
+          leftKeys,
+          rightKeys
+        )
+      }
     }
   }
 
-  def createEmptyInnerJoin(
+  def createNegativeWindowSizeJoin(
+      joinType: JoinType,
       leftDataStream: DataStream[CRow],
       rightDataStream: DataStream[CRow],
+      leftArity: Int,
+      rightArity: Int,
       returnTypeInfo: TypeInformation[CRow]): DataStream[CRow] = {
-    leftDataStream.connect(rightDataStream).process(
-      new CoProcessFunction[CRow, CRow, CRow] {
-        override def processElement1(
-          value: CRow,
-          ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-          out: Collector[CRow]): Unit = {
-          //Do nothing.
-        }
-        override def processElement2(
-          value: CRow,
-          ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-          out: Collector[CRow]): Unit = {
-          //Do nothing.
-        }
-      }).returns(returnTypeInfo)
+    // We filter all records instead of adding an empty source to preserve the watermarks.
+    val allFilter = new FlatMapFunction[CRow, CRow] with ResultTypeQueryable[CRow] {
+      override def flatMap(value: CRow, out: Collector[CRow]): Unit = { }
+      override def getProducedType: TypeInformation[CRow] = returnTypeInfo
+    }
+
+    val leftPadder = new MapFunction[CRow, CRow] with ResultTypeQueryable[CRow] {
+      val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity)
+      override def map(value: CRow): CRow = new CRow(paddingUtil.padLeft(value.row), true)
+      override def getProducedType: TypeInformation[CRow] = returnTypeInfo
+    }
+
+    val rightPadder = new MapFunction[CRow, CRow] with ResultTypeQueryable[CRow] {
+      val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity)
+      override def map(value: CRow): CRow = new CRow(paddingUtil.padRight(value.row), true)
+      override def getProducedType: TypeInformation[CRow] = returnTypeInfo
+    }
+
+    val leftP = leftDataStream.getParallelism
+    val rightP = rightDataStream.getParallelism
+
+    joinType match {
+      case JoinType.INNER =>
+        leftDataStream.flatMap(allFilter).name("Empty Inner Join").setParallelism(leftP)
+          .union(rightDataStream.flatMap(allFilter).name("Empty Inner Join").setParallelism(rightP))
+      case JoinType.LEFT_OUTER =>
+        leftDataStream.map(leftPadder).name("Left Outer Join").setParallelism(leftP)
+          .union(rightDataStream.flatMap(allFilter).name("Left Outer Join").setParallelism(rightP))
+      case JoinType.RIGHT_OUTER =>
+        leftDataStream.flatMap(allFilter).name("Right Outer Join").setParallelism(leftP)
+          .union(rightDataStream.map(rightPadder).name("Right Outer Join").setParallelism(rightP))
+      case JoinType.FULL_OUTER =>
+        leftDataStream.map(leftPadder).name("Full Outer Join").setParallelism(leftP)
+          .union(rightDataStream.map(rightPadder).name("Full Outer Join").setParallelism(rightP))
+    }
   }
 
-  def createProcTimeInnerJoin(
+  def createProcTimeJoin(
+      joinType: JoinType,
       leftDataStream: DataStream[CRow],
       rightDataStream: DataStream[CRow],
       returnTypeInfo: TypeInformation[CRow],
@@ -216,7 +247,8 @@ class DataStreamWindowJoin(
       leftKeys: Array[Int],
       rightKeys: Array[Int]): DataStream[CRow] = {
 
-    val procInnerJoinFunc = new ProcTimeBoundedStreamInnerJoin(
+    val procJoinFunc = new ProcTimeBoundedStreamJoin(
+      joinType,
       leftLowerBound,
       leftUpperBound,
       leftSchema.typeInfo,
@@ -229,13 +261,13 @@ class DataStreamWindowJoin(
         .keyBy(
           new CRowKeySelector(leftKeys, leftSchema.projectedTypeInfo(leftKeys)),
           new CRowKeySelector(rightKeys, rightSchema.projectedTypeInfo(rightKeys)))
-        .process(procInnerJoinFunc)
+        .process(procJoinFunc)
         .name(operatorName)
         .returns(returnTypeInfo)
     } else {
       leftDataStream.connect(rightDataStream)
         .keyBy(new NullByteKeySelector[CRow](), new NullByteKeySelector[CRow]())
-        .process(procInnerJoinFunc)
+        .process(procJoinFunc)
         .setParallelism(1)
         .setMaxParallelism(1)
         .name(operatorName)
@@ -243,7 +275,8 @@ class DataStreamWindowJoin(
     }
   }
 
-  def createRowTimeInnerJoin(
+  def createRowTimeJoin(
+      joinType: JoinType,
       leftDataStream: DataStream[CRow],
       rightDataStream: DataStream[CRow],
       returnTypeInfo: TypeInformation[CRow],
@@ -253,7 +286,8 @@ class DataStreamWindowJoin(
       leftKeys: Array[Int],
       rightKeys: Array[Int]): DataStream[CRow] = {
 
-    val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin(
+    val rowTimeJoinFunc = new RowTimeBoundedStreamJoin(
+      joinType,
       leftLowerBound,
       leftUpperBound,
       allowedLateness = 0L,
@@ -274,8 +308,8 @@ class DataStreamWindowJoin(
           operatorName,
           returnTypeInfo,
           new KeyedCoProcessOperatorWithWatermarkDelay[Tuple, CRow, CRow, CRow](
-            rowTimeInnerJoinFunc,
-            rowTimeInnerJoinFunc.getMaxOutputDelay)
+            rowTimeJoinFunc,
+            rowTimeJoinFunc.getMaxOutputDelay)
         )
     } else {
       leftDataStream.connect(rightDataStream)
@@ -284,8 +318,8 @@ class DataStreamWindowJoin(
           operatorName,
           returnTypeInfo,
           new KeyedCoProcessOperatorWithWatermarkDelay[java.lang.Byte, CRow, CRow, CRow](
-            rowTimeInnerJoinFunc,
-            rowTimeInnerJoinFunc.getMaxOutputDelay)
+            rowTimeJoinFunc,
+            rowTimeJoinFunc.getMaxOutputDelay)
         )
         .setParallelism(1)
         .setMaxParallelism(1)

http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/EmitAwareCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/EmitAwareCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/EmitAwareCollector.scala
new file mode 100644
index 0000000..f11ffa3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/EmitAwareCollector.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Collector to wrap a Row into a [[CRow]] and to track whether a row has been emitted by the inner
+  * collector.
+  */
+class EmitAwareCollector extends Collector[Row]{
+
+  var emitted = false
+  var innerCollector: Collector[CRow] = _
+  private val cRow: CRow = new CRow()
+
+  /** Sets the change value of the CRow. **/
+  def setCRowChange(change: Boolean): Unit = {
+   cRow.change = change
+  }
+
+  /** Resets the emitted flag. **/
+  def reset(): Unit = {
+    emitted = false
+  }
+
+  override def collect(record: Row): Unit = {
+    emitted = true
+    cRow.row = record
+    innerCollector.collect(cRow)
+  }
+
+  override def close(): Unit = {
+    innerCollector.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/OuterJoinPaddingUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/OuterJoinPaddingUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/OuterJoinPaddingUtil.scala
new file mode 100644
index 0000000..6f850a9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/OuterJoinPaddingUtil.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.types.Row
+
+/**
+  * An utility to generate reusable padding results for outer joins.
+  */
+class OuterJoinPaddingUtil(leftArity: Int, rightArity: Int) extends java.io.Serializable{
+
+  private val resultArity = leftArity + rightArity
+  private val leftNullPaddingResult = new Row(resultArity)
+  private val rightNullPaddingResult = new Row(resultArity)
+
+  // Initialize the two reusable padding results.
+  var i = 0
+  while (i < leftArity) {
+    leftNullPaddingResult.setField(i, null)
+    i = i + 1
+  }
+  i = 0
+  while (i < rightArity) {
+    rightNullPaddingResult.setField(i + leftArity, null)
+    i = i + 1
+  }
+
+  /**
+    * Returns a padding result with the given right row.
+    * @param rightRow the right row to pad
+    * @return the reusable null padding result
+    */
+  final def padRight(rightRow: Row): Row = {
+    var i = 0
+    while (i < rightArity) {
+      leftNullPaddingResult.setField(leftArity + i, rightRow.getField(i))
+      i = i + 1
+    }
+    leftNullPaddingResult
+  }
+
+  /**
+    * Returns a padding result with the given left row.
+    * @param leftRow the left row to pad
+    * @return the reusable null padding result
+    */
+  final def padLeft(leftRow: Row): Row = {
+    var i = 0
+    while (i < leftArity) {
+      rightNullPaddingResult.setField(i, leftRow.getField(i))
+      i = i + 1
+    }
+    rightNullPaddingResult
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
deleted file mode 100644
index 3bac42c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.join
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.types.Row
-
-/**
-  * The function to execute processing time bounded stream inner-join.
-  */
-final class ProcTimeBoundedStreamInnerJoin(
-    leftLowerBound: Long,
-    leftUpperBound: Long,
-    leftType: TypeInformation[Row],
-    rightType: TypeInformation[Row],
-    genJoinFuncName: String,
-    genJoinFuncCode: String)
-  extends TimeBoundedStreamInnerJoin(
-    leftLowerBound,
-    leftUpperBound,
-    allowedLateness = 0L,
-    leftType,
-    rightType,
-    genJoinFuncName,
-    genJoinFuncCode) {
-
-  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = {
-    leftOperatorTime = ctx.timerService().currentProcessingTime()
-    rightOperatorTime = leftOperatorTime
-  }
-
-  override def getTimeForLeftStream(
-      context: CoProcessFunction[CRow, CRow, CRow]#Context,
-      row: Row): Long = {
-    leftOperatorTime
-  }
-
-  override def getTimeForRightStream(
-      context: CoProcessFunction[CRow, CRow, CRow]#Context,
-      row: Row): Long = {
-    rightOperatorTime
-  }
-
-  override def registerTimer(
-      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-      cleanupTime: Long): Unit = {
-    ctx.timerService.registerProcessingTimeTimer(cleanupTime)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamJoin.scala
new file mode 100644
index 0000000..94f06d2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamJoin.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * The function to execute processing time bounded stream inner-join.
+  */
+final class ProcTimeBoundedStreamJoin(
+    joinType: JoinType,
+    leftLowerBound: Long,
+    leftUpperBound: Long,
+    leftType: TypeInformation[Row],
+    rightType: TypeInformation[Row],
+    genJoinFuncName: String,
+    genJoinFuncCode: String)
+  extends TimeBoundedStreamJoin(
+    joinType,
+    leftLowerBound,
+    leftUpperBound,
+    allowedLateness = 0L,
+    leftType,
+    rightType,
+    genJoinFuncName,
+    genJoinFuncCode) {
+
+  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = {
+    leftOperatorTime = ctx.timerService().currentProcessingTime()
+    rightOperatorTime = leftOperatorTime
+  }
+
+  override def getTimeForLeftStream(
+      context: CoProcessFunction[CRow, CRow, CRow]#Context,
+      row: Row): Long = {
+    leftOperatorTime
+  }
+
+  override def getTimeForRightStream(
+      context: CoProcessFunction[CRow, CRow, CRow]#Context,
+      row: Row): Long = {
+    rightOperatorTime
+  }
+
+  override def registerTimer(
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      cleanupTime: Long): Unit = {
+    ctx.timerService.registerProcessingTimeTimer(cleanupTime)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
deleted file mode 100644
index a2d9dca..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.join
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.types.Row
-
-/**
-  * The function to execute row(event) time bounded stream inner-join.
-  */
-final class RowTimeBoundedStreamInnerJoin(
-    leftLowerBound: Long,
-    leftUpperBound: Long,
-    allowedLateness: Long,
-    leftType: TypeInformation[Row],
-    rightType: TypeInformation[Row],
-    genJoinFuncName: String,
-    genJoinFuncCode: String,
-    leftTimeIdx: Int,
-    rightTimeIdx: Int)
-  extends TimeBoundedStreamInnerJoin(
-    leftLowerBound,
-    leftUpperBound,
-    allowedLateness,
-    leftType,
-    rightType,
-    genJoinFuncName,
-    genJoinFuncCode) {
-
-  /**
-    * Get the maximum interval between receiving a row and emitting it (as part of a joined result).
-    * This is the time interval by which watermarks need to be held back.
-    *
-    * @return the maximum delay for the outputs
-    */
-  def getMaxOutputDelay: Long = Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness
-
-  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = {
-    leftOperatorTime =
-      if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark()
-      else 0L
-    // We may set different operator times in the future.
-    rightOperatorTime = leftOperatorTime
-  }
-
-  override def getTimeForLeftStream(
-      context: CoProcessFunction[CRow, CRow, CRow]#Context,
-      row: Row): Long = {
-    row.getField(leftTimeIdx).asInstanceOf[Long]
-  }
-
-  override def getTimeForRightStream(
-      context: CoProcessFunction[CRow, CRow, CRow]#Context,
-      row: Row): Long = {
-    row.getField(rightTimeIdx).asInstanceOf[Long]
-  }
-
-  override def registerTimer(
-      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-      cleanupTime: Long): Unit = {
-    // Maybe we can register timers for different streams in the future.
-    ctx.timerService.registerEventTimeTimer(cleanupTime)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamJoin.scala
new file mode 100644
index 0000000..619b8b4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamJoin.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * The function to execute row(event) time bounded stream inner-join.
+  */
+final class RowTimeBoundedStreamJoin(
+    joinType: JoinType,
+    leftLowerBound: Long,
+    leftUpperBound: Long,
+    allowedLateness: Long,
+    leftType: TypeInformation[Row],
+    rightType: TypeInformation[Row],
+    genJoinFuncName: String,
+    genJoinFuncCode: String,
+    leftTimeIdx: Int,
+    rightTimeIdx: Int)
+  extends TimeBoundedStreamJoin(
+    joinType,
+    leftLowerBound,
+    leftUpperBound,
+    allowedLateness,
+    leftType,
+    rightType,
+    genJoinFuncName,
+    genJoinFuncCode) {
+
+  /**
+    * Get the maximum interval between receiving a row and emitting it (as part of a joined result).
+    * This is the time interval by which watermarks need to be held back.
+    *
+    * @return the maximum delay for the outputs
+    */
+  def getMaxOutputDelay: Long = Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness
+
+  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = {
+    leftOperatorTime =
+      if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark()
+      else 0L
+    // We may set different operator times in the future.
+    rightOperatorTime = leftOperatorTime
+  }
+
+  override def getTimeForLeftStream(
+      context: CoProcessFunction[CRow, CRow, CRow]#Context,
+      row: Row): Long = {
+    row.getField(leftTimeIdx).asInstanceOf[Long]
+  }
+
+  override def getTimeForRightStream(
+      context: CoProcessFunction[CRow, CRow, CRow]#Context,
+      row: Row): Long = {
+    row.getField(rightTimeIdx).asInstanceOf[Long]
+  }
+
+  override def registerTimer(
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      cleanupTime: Long): Unit = {
+    // Maybe we can register timers for different streams in the future.
+    ctx.timerService.registerEventTimeTimer(cleanupTime)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
deleted file mode 100644
index 9625eac..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.join
-
-import java.util
-import java.util.{List => JList}
-
-import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.flink.api.common.state._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ListTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.codegen.Compiler
-import org.apache.flink.table.runtime.CRowWrappingCollector
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.table.util.Logging
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-
-/**
-  * A CoProcessFunction to execute time-bounded stream inner-join.
-  * Two kinds of time criteria:
-  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X" where
-  * X and Y might be negative or positive and X <= Y.
-  *
-  * @param leftLowerBound  the lower bound for the left stream (X in the criteria)
-  * @param leftUpperBound  the upper bound for the left stream (Y in the criteria)
-  * @param allowedLateness the lateness allowed for the two streams
-  * @param leftType        the input type of left stream
-  * @param rightType       the input type of right stream
-  * @param genJoinFuncName the name of the generated function
-  * @param genJoinFuncCode the code of function to evaluate the non-window join conditions
-  *
-  */
-abstract class TimeBoundedStreamInnerJoin(
-    private val leftLowerBound: Long,
-    private val leftUpperBound: Long,
-    private val allowedLateness: Long,
-    private val leftType: TypeInformation[Row],
-    private val rightType: TypeInformation[Row],
-    private val genJoinFuncName: String,
-    private val genJoinFuncCode: String)
-  extends CoProcessFunction[CRow, CRow, CRow]
-  with Compiler[FlatJoinFunction[Row, Row, Row]]
-  with Logging {
-
-  private var cRowWrapper: CRowWrappingCollector = _
-
-  // the join function for other conditions
-  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
-
-  // cache to store rows from the left stream
-  private var leftCache: MapState[Long, JList[Row]] = _
-  // cache to store rows from the right stream
-  private var rightCache: MapState[Long, JList[Row]] = _
-
-  // state to record the timer on the left stream. 0 means no timer set
-  private var leftTimerState: ValueState[Long] = _
-  // state to record the timer on the right stream. 0 means no timer set
-  private var rightTimerState: ValueState[Long] = _
-
-  protected val leftRelativeSize: Long = -leftLowerBound
-  protected val rightRelativeSize: Long = leftUpperBound
-
-  // Points in time until which the respective cache has been cleaned.
-  private var leftExpirationTime: Long = 0L
-  private var rightExpirationTime: Long = 0L
-
-  // Current time on the respective input stream.
-  protected var leftOperatorTime: Long = 0L
-  protected var rightOperatorTime: Long = 0L
-
-  // Minimum interval by which state is cleaned up
-  private val minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2
-
-  if (allowedLateness < 0) {
-    throw new IllegalArgumentException("The allowed lateness must be non-negative.")
-  }
-
-  override def open(config: Configuration) {
-    LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
-      s"Code:\n$genJoinFuncCode")
-    val clazz = compile(
-      getRuntimeContext.getUserCodeClassLoader,
-      genJoinFuncName,
-      genJoinFuncCode)
-    LOG.debug("Instantiating JoinFunction.")
-    joinFunction = clazz.newInstance()
-
-    cRowWrapper = new CRowWrappingCollector()
-    cRowWrapper.setChange(true)
-
-    // Initialize the data caches.
-    val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType)
-    val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-      new MapStateDescriptor[Long, JList[Row]](
-        "InnerJoinLeftCache",
-        Types.LONG.asInstanceOf[TypeInformation[Long]],
-        leftListTypeInfo)
-    leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
-
-    val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType)
-    val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-      new MapStateDescriptor[Long, JList[Row]](
-        "InnerJoinRightCache",
-        Types.LONG.asInstanceOf[TypeInformation[Long]],
-        rightListTypeInfo)
-    rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
-
-    // Initialize the timer states.
-    val leftTimerStateDesc: ValueStateDescriptor[Long] =
-      new ValueStateDescriptor[Long]("InnerJoinLeftTimerState", classOf[Long])
-    leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
-
-    val rightTimerStateDesc: ValueStateDescriptor[Long] =
-      new ValueStateDescriptor[Long]("InnerJoinRightTimerState", classOf[Long])
-    rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
-  }
-
-  /**
-    * Process rows from the left stream.
-    */
-  override def processElement1(
-      cRowValue: CRow,
-      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-      out: Collector[CRow]): Unit = {
-
-    updateOperatorTime(ctx)
-    val leftRow = cRowValue.row
-    val timeForLeftRow: Long = getTimeForLeftStream(ctx, leftRow)
-    val rightQualifiedLowerBound: Long = timeForLeftRow - rightRelativeSize
-    val rightQualifiedUpperBound: Long = timeForLeftRow + leftRelativeSize
-    cRowWrapper.out = out
-
-    // Check if we need to cache the current row.
-    if (rightOperatorTime < rightQualifiedUpperBound) {
-      // Operator time of right stream has not exceeded the upper window bound of the current
-      // row. Put it into the left cache, since later coming records from the right stream are
-      // expected to be joined with it.
-      var leftRowList = leftCache.get(timeForLeftRow)
-      if (null == leftRowList) {
-        leftRowList = new util.ArrayList[Row](1)
-      }
-      leftRowList.add(leftRow)
-      leftCache.put(timeForLeftRow, leftRowList)
-      if (rightTimerState.value == 0) {
-        // Register a timer on the RIGHT stream to remove rows.
-        registerCleanUpTimer(ctx, timeForLeftRow, leftRow = true)
-      }
-    }
-    // Check if we need to join the current row against cached rows of the right input.
-    // The condition here should be rightMinimumTime < rightQualifiedUpperBound.
-    // We use rightExpirationTime as an approximation of the rightMinimumTime here,
-    // since rightExpirationTime <= rightMinimumTime is always true.
-    if (rightExpirationTime < rightQualifiedUpperBound) {
-      // Upper bound of current join window has not passed the cache expiration time yet.
-      // There might be qualifying rows in the cache that the current row needs to be joined with.
-      rightExpirationTime = calExpirationTime(leftOperatorTime, rightRelativeSize)
-      // Join the leftRow with rows from the right cache.
-      val rightIterator = rightCache.iterator()
-      while (rightIterator.hasNext) {
-        val rightEntry = rightIterator.next
-        val rightTime = rightEntry.getKey
-        if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) {
-          val rightRows = rightEntry.getValue
-          var i = 0
-          while (i < rightRows.size) {
-            joinFunction.join(leftRow, rightRows.get(i), cRowWrapper)
-            i += 1
-          }
-        }
-
-        if (rightTime <= rightExpirationTime) {
-          // eager remove
-          rightIterator.remove()
-        }// We could do the short-cutting optimization here once we get a state with ordered keys.
-      }
-    }
-  }
-
-  /**
-    * Process rows from the right stream.
-    */
-  override def processElement2(
-      cRowValue: CRow,
-      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-      out: Collector[CRow]): Unit = {
-
-    updateOperatorTime(ctx)
-    val rightRow = cRowValue.row
-    val timeForRightRow: Long = getTimeForRightStream(ctx, rightRow)
-    val leftQualifiedLowerBound: Long = timeForRightRow - leftRelativeSize
-    val leftQualifiedUpperBound: Long =  timeForRightRow + rightRelativeSize
-    cRowWrapper.out = out
-
-    // Check if we need to cache the current row.
-    if (leftOperatorTime < leftQualifiedUpperBound) {
-      // Operator time of left stream has not exceeded the upper window bound of the current
-      // row. Put it into the right cache, since later coming records from the left stream are
-      // expected to be joined with it.
-      var rightRowList = rightCache.get(timeForRightRow)
-      if (null == rightRowList) {
-        rightRowList = new util.ArrayList[Row](1)
-      }
-      rightRowList.add(rightRow)
-      rightCache.put(timeForRightRow, rightRowList)
-      if (leftTimerState.value == 0) {
-        // Register a timer on the LEFT stream to remove rows.
-        registerCleanUpTimer(ctx, timeForRightRow, leftRow = false)
-      }
-    }
-    // Check if we need to join the current row against cached rows of the left input.
-    // The condition here should be leftMinimumTime < leftQualifiedUpperBound.
-    // We use leftExpirationTime as an approximation of the leftMinimumTime here,
-    // since leftExpirationTime <= leftMinimumTime is always true.
-    if (leftExpirationTime < leftQualifiedUpperBound) {
-      leftExpirationTime = calExpirationTime(rightOperatorTime, leftRelativeSize)
-      // Join the rightRow with rows from the left cache.
-      val leftIterator = leftCache.iterator()
-      while (leftIterator.hasNext) {
-        val leftEntry = leftIterator.next
-        val leftTime = leftEntry.getKey
-        if (leftTime >= leftQualifiedLowerBound && leftTime <= leftQualifiedUpperBound) {
-          val leftRows = leftEntry.getValue
-          var i = 0
-          while (i < leftRows.size) {
-            joinFunction.join(leftRows.get(i), rightRow, cRowWrapper)
-            i += 1
-          }
-        }
-        if (leftTime <= leftExpirationTime) {
-          // eager remove
-          leftIterator.remove()
-        } // We could do the short-cutting optimization here once we get a state with ordered keys.
-      }
-    }
-  }
-
-  /**
-    * Called when a registered timer is fired.
-    * Remove rows whose timestamps are earlier than the expiration time,
-    * and register a new timer for the remaining rows.
-    *
-    * @param timestamp the timestamp of the timer
-    * @param ctx       the context to register timer or get current time
-    * @param out       the collector for returning result values
-    */
-  override def onTimer(
-      timestamp: Long,
-      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
-      out: Collector[CRow]): Unit = {
-
-    updateOperatorTime(ctx)
-    // In the future, we should separate the left and right watermarks. Otherwise, the
-    // registered timer of the faster stream will be delayed, even if the watermarks have
-    // already been emitted by the source.
-    if (leftTimerState.value == timestamp) {
-      rightExpirationTime = calExpirationTime(leftOperatorTime, rightRelativeSize)
-      removeExpiredRows(
-        rightExpirationTime,
-        rightCache,
-        leftTimerState,
-        ctx,
-        removeLeft = false
-      )
-    }
-
-    if (rightTimerState.value == timestamp) {
-      leftExpirationTime = calExpirationTime(rightOperatorTime, leftRelativeSize)
-      removeExpiredRows(
-        leftExpirationTime,
-        leftCache,
-        rightTimerState,
-        ctx,
-        removeLeft = true
-      )
-    }
-  }
-
-  /**
-    * Calculate the expiration time with the given operator time and relative window size.
-    *
-    * @param operatorTime the operator time
-    * @param relativeSize the relative window size
-    * @return the expiration time for cached rows
-    */
-  private def calExpirationTime(operatorTime: Long, relativeSize: Long): Long = {
-    if (operatorTime < Long.MaxValue) {
-      operatorTime - relativeSize - allowedLateness - 1
-    } else {
-      // When operatorTime = Long.MaxValue, it means the stream has reached the end.
-      Long.MaxValue
-    }
-  }
-
-  /**
-    * Register a timer for cleaning up rows in a specified time.
-    *
-    * @param ctx        the context to register timer
-    * @param rowTime    time for the input row
-    * @param leftRow    whether this row comes from the left stream
-    */
-  private def registerCleanUpTimer(
-      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-      rowTime: Long,
-      leftRow: Boolean): Unit = {
-    if (leftRow) {
-      val cleanupTime = rowTime + leftRelativeSize + minCleanUpInterval + allowedLateness + 1
-      registerTimer(ctx, cleanupTime)
-      rightTimerState.update(cleanupTime)
-    } else {
-      val cleanupTime = rowTime + rightRelativeSize + minCleanUpInterval + allowedLateness + 1
-      registerTimer(ctx, cleanupTime)
-      leftTimerState.update(cleanupTime)
-    }
-  }
-
-  /**
-    * Remove the expired rows. Register a new timer if the cache still holds valid rows
-    * after the cleaning up.
-    *
-    * @param expirationTime the expiration time for this cache
-    * @param rowCache       the row cache
-    * @param timerState     timer state for the opposite stream
-    * @param ctx            the context to register the cleanup timer
-    * @param removeLeft     whether to remove the left rows
-    */
-  private def removeExpiredRows(
-      expirationTime: Long,
-      rowCache: MapState[Long, JList[Row]],
-      timerState: ValueState[Long],
-      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
-      removeLeft: Boolean): Unit = {
-
-    val keysIterator = rowCache.keys().iterator()
-
-    var earliestTimestamp: Long = -1L
-    var rowTime: Long = 0L
-
-    // We remove all expired keys and do not leave the loop early.
-    // Hence, we do a full pass over the state.
-    while (keysIterator.hasNext) {
-      rowTime = keysIterator.next
-      if (rowTime <= expirationTime) {
-        keysIterator.remove()
-      } else {
-        // We find the earliest timestamp that is still valid.
-        if (rowTime < earliestTimestamp || earliestTimestamp < 0) {
-          earliestTimestamp = rowTime
-        }
-      }
-    }
-
-    if (earliestTimestamp > 0) {
-      // There are rows left in the cache. Register a timer to expire them later.
-      registerCleanUpTimer(
-        ctx,
-        earliestTimestamp,
-        removeLeft)
-    } else {
-      // No rows left in the cache. Clear the states and the timerState will be 0.
-      timerState.clear()
-      rowCache.clear()
-    }
-  }
-
-  /**
-    * Update the operator time of the two streams.
-    * Must be the first call in all processing methods (i.e., processElement(), onTimer()).
-    *
-    * @param ctx the context to acquire watermarks
-    */
-  def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit
-
-  /**
-    * Return the time for the target row from the left stream.
-    *
-    * Requires that [[updateOperatorTime()]] has been called before.
-    *
-    * @param context the runtime context
-    * @param row     the target row
-    * @return time for the target row
-    */
-  def getTimeForLeftStream(context: CoProcessFunction[CRow, CRow, CRow]#Context, row: Row): Long
-
-  /**
-    * Return the time for the target row from the right stream.
-    *
-    * Requires that [[updateOperatorTime()]] has been called before.
-    *
-    * @param context the runtime context
-    * @param row     the target row
-    * @return time for the target row
-    */
-  def getTimeForRightStream(context: CoProcessFunction[CRow, CRow, CRow]#Context, row: Row): Long
-
-  /**
-    * Register a proctime or rowtime timer.
-    *
-    * @param ctx         the context to register the timer
-    * @param cleanupTime timestamp for the timer
-    */
-  def registerTimer(
-      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-      cleanupTime: Long): Unit
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
new file mode 100644
index 0000000..b5546db
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X" where
+  * X and Y might be negative or positive and X <= Y.
+  *
+  * @param joinType        the join type (inner or left/right/full outer)
+  * @param leftLowerBound  the lower bound for the left stream (X in the criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftType        the input type of left stream
+  * @param rightType       the input type of right stream
+  * @param genJoinFuncName the name of the generated function
+  * @param genJoinFuncCode the code of function to evaluate the non-window join conditions
+  *
+  */
+abstract class TimeBoundedStreamJoin(
+    private val joinType: JoinType,
+    private val leftLowerBound: Long,
+    private val leftUpperBound: Long,
+    private val allowedLateness: Long,
+    private val leftType: TypeInformation[Row],
+    private val rightType: TypeInformation[Row],
+    private val genJoinFuncName: String,
+    private val genJoinFuncCode: String)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  private val paddingUtil: OuterJoinPaddingUtil =
+    new OuterJoinPaddingUtil(leftType.getArity, rightType.getArity)
+
+  private var joinCollector: EmitAwareCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  protected val leftRelativeSize: Long = -leftLowerBound
+  protected val rightRelativeSize: Long = leftUpperBound
+
+  // Points in time until which the respective cache has been cleaned.
+  private var leftExpirationTime: Long = 0L
+  private var rightExpirationTime: Long = 0L
+
+  // Current time on the respective input stream.
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+  // Minimum interval by which state is cleaned up
+  private val minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+    throw new IllegalArgumentException("The allowed lateness must be non-negative.")
+  }
+
+  override def open(config: Configuration) {
+    LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+      s"Code:\n$genJoinFuncCode")
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genJoinFuncName,
+      genJoinFuncCode)
+    LOG.debug("Instantiating JoinFunction.")
+    joinFunction = clazz.newInstance()
+
+    joinCollector = new EmitAwareCollector()
+    joinCollector.setCRowChange(true)
+
+    // Initialize the data caches.
+    val leftListTypeInfo: TypeInformation[JList[JTuple2[Row, Boolean]]] =
+      new ListTypeInfo[JTuple2[Row, Boolean]] (
+        new TupleTypeInfo(leftType, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+          .asInstanceOf[TypeInformation[JTuple2[Row, Boolean]]])
+    val leftStateDescriptor: MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]] =
+      new MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]](
+        "WindowJoinLeftCache",
+        Types.LONG.asInstanceOf[TypeInformation[Long]],
+        leftListTypeInfo)
+    leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
+
+    val rightListTypeInfo: TypeInformation[JList[JTuple2[Row, Boolean]]] =
+      new ListTypeInfo[JTuple2[Row, Boolean]] (
+        new TupleTypeInfo(rightType, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+          .asInstanceOf[TypeInformation[JTuple2[Row, Boolean]]])
+    val rightStateDescriptor: MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]] =
+      new MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]](
+        "WindowJoinRightCache",
+        Types.LONG.asInstanceOf[TypeInformation[Long]],
+        rightListTypeInfo)
+    rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
+
+    // Initialize the timer states.
+    val leftTimerStateDesc: ValueStateDescriptor[Long] =
+      new ValueStateDescriptor[Long]("WindowJoinLeftTimerState", classOf[Long])
+    leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
+
+    val rightTimerStateDesc: ValueStateDescriptor[Long] =
+      new ValueStateDescriptor[Long]("WindowJoinRightTimerState", classOf[Long])
+    rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
+  }
+
+  /**
+    * Process rows from the left stream.
+    */
+  override def processElement1(
+      cRowValue: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    joinCollector.innerCollector = out
+    updateOperatorTime(ctx)
+    val leftRow = cRowValue.row
+    val timeForLeftRow: Long = getTimeForLeftStream(ctx, leftRow)
+    val rightQualifiedLowerBound: Long = timeForLeftRow - rightRelativeSize
+    val rightQualifiedUpperBound: Long = timeForLeftRow + leftRelativeSize
+    var emitted: Boolean = false
+
+    // Check if we need to join the current row against cached rows of the right input.
+    // The condition here should be rightMinimumTime < rightQualifiedUpperBound.
+    // We use rightExpirationTime as an approximation of the rightMinimumTime here,
+    // since rightExpirationTime <= rightMinimumTime is always true.
+    if (rightExpirationTime < rightQualifiedUpperBound) {
+      // Upper bound of current join window has not passed the cache expiration time yet.
+      // There might be qualifying rows in the cache that the current row needs to be joined with.
+      rightExpirationTime = calExpirationTime(leftOperatorTime, rightRelativeSize)
+      // Join the leftRow with rows from the right cache.
+      val rightIterator = rightCache.iterator()
+      while (rightIterator.hasNext) {
+        val rightEntry = rightIterator.next
+        val rightTime = rightEntry.getKey
+        if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) {
+          val rightRows = rightEntry.getValue
+          var i = 0
+          var entryUpdated = false
+          while (i < rightRows.size) {
+            joinCollector.reset()
+            val tuple = rightRows.get(i)
+            joinFunction.join(leftRow, tuple.f0, joinCollector)
+            emitted ||= joinCollector.emitted
+            if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
+              if (!tuple.f1 && joinCollector.emitted) {
+                // Mark the right row as being successfully joined and emitted.
+                tuple.f1 = true
+                entryUpdated = true
+              }
+            }
+            i += 1
+          }
+          if (entryUpdated) {
+            // Write back the edited entry (mark emitted) for the right cache.
+            rightEntry.setValue(rightRows)
+          }
+        }
+
+        if (rightTime <= rightExpirationTime) {
+          if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
+            val rightRows = rightEntry.getValue
+            var i = 0
+            while (i < rightRows.size) {
+              val tuple = rightRows.get(i)
+              if (!tuple.f1) {
+                // Emit a null padding result if the right row has never been successfully joined.
+                joinCollector.collect(paddingUtil.padRight(tuple.f0))
+              }
+              i += 1
+            }
+          }
+          // eager remove
+          rightIterator.remove()
+        } // We could do the short-cutting optimization here once we get a state with ordered keys.
+      }
+    }
+
+    // Check if we need to cache the current row.
+    if (rightOperatorTime < rightQualifiedUpperBound) {
+      // Operator time of right stream has not exceeded the upper window bound of the current
+      // row. Put it into the left cache, since later coming records from the right stream are
+      // expected to be joined with it.
+      var leftRowList = leftCache.get(timeForLeftRow)
+      if (null == leftRowList) {
+        leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
+      }
+      leftRowList.add(JTuple2.of(leftRow, emitted))
+      leftCache.put(timeForLeftRow, leftRowList)
+      if (rightTimerState.value == 0) {
+        // Register a timer on the RIGHT stream to remove rows.
+        registerCleanUpTimer(ctx, timeForLeftRow, leftRow = true)
+      }
+    } else if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) {
+      if (!emitted) {
+        // Emit a null padding result if the left row is not cached and successfully joined.
+        joinCollector.collect(paddingUtil.padLeft(leftRow))
+      }
+    }
+  }
+
+  /**
+    * Process rows from the right stream.
+    */
+  override def processElement2(
+      cRowValue: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    joinCollector.innerCollector = out
+    updateOperatorTime(ctx)
+    val rightRow = cRowValue.row
+    val timeForRightRow: Long = getTimeForRightStream(ctx, rightRow)
+    val leftQualifiedLowerBound: Long = timeForRightRow - leftRelativeSize
+    val leftQualifiedUpperBound: Long =  timeForRightRow + rightRelativeSize
+    var emitted: Boolean = false
+
+    // Check if we need to join the current row against cached rows of the left input.
+    // The condition here should be leftMinimumTime < leftQualifiedUpperBound.
+    // We use leftExpirationTime as an approximation of the leftMinimumTime here,
+    // since leftExpirationTime <= leftMinimumTime is always true.
+    if (leftExpirationTime < leftQualifiedUpperBound) {
+      leftExpirationTime = calExpirationTime(rightOperatorTime, leftRelativeSize)
+      // Join the rightRow with rows from the left cache.
+      val leftIterator = leftCache.iterator()
+      while (leftIterator.hasNext) {
+        val leftEntry = leftIterator.next
+        val leftTime = leftEntry.getKey
+        if (leftTime >= leftQualifiedLowerBound && leftTime <= leftQualifiedUpperBound) {
+          val leftRows = leftEntry.getValue
+          var i = 0
+          var entryUpdated = false
+          while (i < leftRows.size) {
+            joinCollector.reset()
+            val tuple = leftRows.get(i)
+            joinFunction.join(tuple.f0, rightRow, joinCollector)
+            emitted ||= joinCollector.emitted
+            if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) {
+              if (!tuple.f1 && joinCollector.emitted) {
+                // Mark the left row as being successfully joined and emitted.
+                tuple.f1 = true
+                entryUpdated = true
+              }
+            }
+            i += 1
+          }
+          if (entryUpdated) {
+            // Write back the edited entry (mark emitted) for the right cache.
+            leftEntry.setValue(leftRows)
+          }
+        }
+        if (leftTime <= leftExpirationTime) {
+          if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) {
+            val leftRows = leftEntry.getValue
+            var i = 0
+            while (i < leftRows.size) {
+              val tuple = leftRows.get(i)
+              if (!tuple.f1) {
+                // Emit a null padding result if the left row has never been successfully joined.
+                joinCollector.collect(paddingUtil.padLeft(tuple.f0))
+                println(s"Emitting a null padding result for left row ${tuple.f0}")
+              }
+              i += 1
+            }
+          }
+          // eager remove
+          leftIterator.remove()
+        } // We could do the short-cutting optimization here once we get a state with ordered keys.
+      }
+    }
+
+    // Check if we need to cache the current row.
+    if (leftOperatorTime < leftQualifiedUpperBound) {
+      // Operator time of left stream has not exceeded the upper window bound of the current
+      // row. Put it into the right cache, since later coming records from the left stream are
+      // expected to be joined with it.
+      var rightRowList = rightCache.get(timeForRightRow)
+      if (null == rightRowList) {
+        rightRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
+      }
+      rightRowList.add(new JTuple2(rightRow, emitted))
+      rightCache.put(timeForRightRow, rightRowList)
+      if (leftTimerState.value == 0) {
+        // Register a timer on the LEFT stream to remove rows.
+        registerCleanUpTimer(ctx, timeForRightRow, leftRow = false)
+      }
+    } else if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
+      if (!emitted) {
+        // Emit a null padding result if the right row is not cached and successfully joined.
+        joinCollector.collect(paddingUtil.padRight(rightRow))
+      }
+    }
+  }
+
+  /**
+    * Called when a registered timer is fired.
+    * Remove rows whose timestamps are earlier than the expiration time,
+    * and register a new timer for the remaining rows.
+    *
+    * @param timestamp the timestamp of the timer
+    * @param ctx       the context to register timer or get current time
+    * @param out       the collector for returning result values
+    */
+  override def onTimer(
+      timestamp: Long,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
+      out: Collector[CRow]): Unit = {
+
+    joinCollector.innerCollector = out
+    updateOperatorTime(ctx)
+    // In the future, we should separate the left and right watermarks. Otherwise, the
+    // registered timer of the faster stream will be delayed, even if the watermarks have
+    // already been emitted by the source.
+    if (leftTimerState.value == timestamp) {
+      rightExpirationTime = calExpirationTime(leftOperatorTime, rightRelativeSize)
+      removeExpiredRows(
+        joinCollector,
+        rightExpirationTime,
+        rightCache,
+        leftTimerState,
+        ctx,
+        removeLeft = false
+      )
+    }
+
+    if (rightTimerState.value == timestamp) {
+      leftExpirationTime = calExpirationTime(rightOperatorTime, leftRelativeSize)
+      removeExpiredRows(
+        joinCollector,
+        leftExpirationTime,
+        leftCache,
+        rightTimerState,
+        ctx,
+        removeLeft = true
+      )
+    }
+  }
+
+  /**
+    * Calculate the expiration time with the given operator time and relative window size.
+    *
+    * @param operatorTime the operator time
+    * @param relativeSize the relative window size
+    * @return the expiration time for cached rows
+    */
+  private def calExpirationTime(operatorTime: Long, relativeSize: Long): Long = {
+    if (operatorTime < Long.MaxValue) {
+      operatorTime - relativeSize - allowedLateness - 1
+    } else {
+      // When operatorTime = Long.MaxValue, it means the stream has reached the end.
+      Long.MaxValue
+    }
+  }
+
+  /**
+    * Register a timer for cleaning up rows in a specified time.
+    *
+    * @param ctx        the context to register timer
+    * @param rowTime    time for the input row
+    * @param leftRow    whether this row comes from the left stream
+    */
+  private def registerCleanUpTimer(
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      rowTime: Long,
+      leftRow: Boolean): Unit = {
+    if (leftRow) {
+      val cleanupTime = rowTime + leftRelativeSize + minCleanUpInterval + allowedLateness + 1
+      registerTimer(ctx, cleanupTime)
+      rightTimerState.update(cleanupTime)
+    } else {
+      val cleanupTime = rowTime + rightRelativeSize + minCleanUpInterval + allowedLateness + 1
+      registerTimer(ctx, cleanupTime)
+      leftTimerState.update(cleanupTime)
+    }
+  }
+
+  /**
+    * Remove the expired rows. Register a new timer if the cache still holds valid rows
+    * after the cleaning up.
+    *
+    * @param collector      the collector to emit results
+    * @param expirationTime the expiration time for this cache
+    * @param rowCache       the row cache
+    * @param timerState     timer state for the opposite stream
+    * @param ctx            the context to register the cleanup timer
+    * @param removeLeft     whether to remove the left rows
+    */
+  private def removeExpiredRows(
+      collector: Collector[Row],
+      expirationTime: Long,
+      rowCache: MapState[Long, JList[JTuple2[Row, Boolean]]],
+      timerState: ValueState[Long],
+      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
+      removeLeft: Boolean): Unit = {
+
+    val iterator = rowCache.iterator()
+
+    var earliestTimestamp: Long = -1L
+
+    // We remove all expired keys and do not leave the loop early.
+    // Hence, we do a full pass over the state.
+    while (iterator.hasNext) {
+      val entry = iterator.next
+      val rowTime = entry.getKey
+      if (rowTime <= expirationTime) {
+        if (removeLeft &&
+          (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER)) {
+          val rows = entry.getValue
+          var i = 0
+          while (i < rows.size) {
+            val tuple = rows.get(i)
+            if (!tuple.f1) {
+              // Emit a null padding result if the row has never been successfully joined.
+              collector.collect(paddingUtil.padLeft(tuple.f0))
+            }
+            i += 1
+          }
+        } else if (!removeLeft &&
+          (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER)) {
+          val rows = entry.getValue
+          var i = 0
+          while (i < rows.size) {
+            val tuple = rows.get(i)
+            if (!tuple.f1) {
+              // Emit a null padding result if the row has never been successfully joined.
+              collector.collect(paddingUtil.padRight(tuple.f0))
+            }
+            i += 1
+          }
+        }
+        iterator.remove()
+      } else {
+        // We find the earliest timestamp that is still valid.
+        if (rowTime < earliestTimestamp || earliestTimestamp < 0) {
+          earliestTimestamp = rowTime
+        }
+      }
+    }
+
+    if (earliestTimestamp > 0) {
+      // There are rows left in the cache. Register a timer to expire them later.
+      registerCleanUpTimer(
+        ctx,
+        earliestTimestamp,
+        removeLeft)
+    } else {
+      // No rows left in the cache. Clear the states and the timerState will be 0.
+      timerState.clear()
+      rowCache.clear()
+    }
+  }
+
+  /**
+    * Update the operator time of the two streams.
+    * Must be the first call in all processing methods (i.e., processElement(), onTimer()).
+    *
+    * @param ctx the context to acquire watermarks
+    */
+  def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit
+
+  /**
+    * Return the time for the target row from the left stream.
+    *
+    * Requires that [[updateOperatorTime()]] has been called before.
+    *
+    * @param context the runtime context
+    * @param row     the target row
+    * @return time for the target row
+    */
+  def getTimeForLeftStream(context: CoProcessFunction[CRow, CRow, CRow]#Context, row: Row): Long
+
+  /**
+    * Return the time for the target row from the right stream.
+    *
+    * Requires that [[updateOperatorTime()]] has been called before.
+    *
+    * @param context the runtime context
+    * @param row     the target row
+    * @return time for the target row
+    */
+  def getTimeForRightStream(context: CoProcessFunction[CRow, CRow, CRow]#Context, row: Row): Long
+
+  /**
+    * Register a proctime or rowtime timer.
+    *
+    * @param ctx         the context to register the timer
+    * @param cleanupTime timestamp for the timer
+    */
+  def registerTimer(
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      cleanupTime: Long): Unit
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index c14b698..cef19c1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -34,6 +34,7 @@ class JoinTest extends TableTestBase {
   streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
   streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
 
+  // Tests for inner join
   @Test
   def testProcessingTimeInnerJoinWithOnClause(): Unit = {
 
@@ -379,6 +380,280 @@ class JoinTest extends TableTestBase {
     streamUtil.verifySql(sqlQuery, expected)
   }
 
+  // Tests for left outer join
+  @Test
+  def testProcTimeLeftOuterJoin(): Unit = {
+
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1 LEFT OUTER JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+        |""".stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "proctime")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "proctime")
+          ),
+          term("where",
+            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+              "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+          term("join", "a, proctime, a0, b, proctime0"),
+          term("joinType", "LeftOuterJoin")
+        ),
+        term("select", "a", "b")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testRowTimeLeftOuterJoin(): Unit = {
+
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1 LEFT OUTER JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  t1.c BETWEEN t2.c - INTERVAL '10' SECOND AND t2.c + INTERVAL '1' HOUR
+        |""".stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "c")
+          ),
+          term("where",
+            "AND(=(a, a0), >=(c, -(c0, 10000)), " +
+              "<=(c, DATETIME_PLUS(c0, 3600000)))"),
+          term("join", "a, c, a0, b, c0"),
+          term("joinType", "LeftOuterJoin")
+        ),
+        term("select", "a", "b")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  // Tests for right outer join
+  @Test
+  def testProcTimeRightOuterJoin(): Unit = {
+
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1 RIGHT OUTER JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+        |""".stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "proctime")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "proctime")
+          ),
+          term("where",
+            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+              "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+          term("join", "a, proctime, a0, b, proctime0"),
+          term("joinType", "RightOuterJoin")
+        ),
+        term("select", "a", "b")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testRowTimeRightOuterJoin(): Unit = {
+
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1 RIGHT OUTER JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  t1.c BETWEEN t2.c - INTERVAL '10' SECOND AND t2.c + INTERVAL '1' HOUR
+        |""".stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "c")
+          ),
+          term("where",
+            "AND(=(a, a0), >=(c, -(c0, 10000)), " +
+              "<=(c, DATETIME_PLUS(c0, 3600000)))"),
+          term("join", "a, c, a0, b, c0"),
+          term("joinType", "RightOuterJoin")
+        ),
+        term("select", "a", "b")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  // Tests for full outer join
+  @Test
+  def testProcTimeFullOuterJoin(): Unit = {
+
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1 Full OUTER JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+        |""".stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "proctime")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "proctime")
+          ),
+          term("where",
+            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+              "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+          term("join", "a, proctime, a0, b, proctime0"),
+          term("joinType", "FullOuterJoin")
+        ),
+        term("select", "a", "b")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testRowTimeFullOuterJoin(): Unit = {
+
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1 FULL OUTER JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  t1.c BETWEEN t2.c - INTERVAL '10' SECOND AND t2.c + INTERVAL '1' HOUR
+        |""".stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "c")
+          ),
+          term("where",
+            "AND(=(a, a0), >=(c, -(c0, 10000)), " +
+              "<=(c, DATETIME_PLUS(c0, 3600000)))"),
+          term("join", "a, c, a0, b, c0"),
+          term("joinType", "FullOuterJoin")
+        ),
+        term("select", "a", "b")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  // Test for outer join optimization
+  @Test
+  def testOuterJoinOpt(): Unit = {
+
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1 FULL OUTER JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  t1.c BETWEEN t2.c - INTERVAL '10' SECOND AND t2.c + INTERVAL '1' HOUR
+        |  WHERE t1.b LIKE t2.b
+        |""".stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "b", "c")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "c")
+          ),
+          term("where",
+            "AND(=(a, a0), >=(c, -(c0, 10000)), " +
+              "<=(c, DATETIME_PLUS(c0, 3600000)), LIKE(b, b0))"),
+          term("join", "a, b, c, a0, b0, c0"),
+          // Since we filter on attributes b and b0 after the join, the full outer join
+          // will be automatically optimized to inner join.
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b0 AS b")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  // Other tests
   @Test
   def testJoinTimeBoundary(): Unit = {
     verifyTimeBoundary(