You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/07/18 07:40:06 UTC

[1/3] flink git commit: [FLINK-6232] [table] Add support for processing time inner windowed stream join.

Repository: flink
Updated Branches:
  refs/heads/master 1125122a7 -> 471345c0e


[FLINK-6232] [table] Add support for processing time inner windowed stream join.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba6c59e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba6c59e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba6c59e6

Branch: refs/heads/master
Commit: ba6c59e6d744958db7332f88ffdd46effc9ad400
Parents: 1125122
Author: hongyuhong <ho...@huawei.com>
Authored: Thu Jul 6 11:24:04 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Jul 18 00:36:41 2017 +0200

----------------------------------------------------------------------
 .../table/api/StreamTableEnvironment.scala      | 109 +-----
 .../calcite/RelTimeIndicatorConverter.scala     |  10 +-
 .../flink/table/plan/nodes/CommonJoin.scala     |  73 ++++
 .../table/plan/nodes/dataset/DataSetJoin.scala  |  47 ++-
 .../nodes/datastream/DataStreamWindowJoin.scala | 187 ++++++++++
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   1 +
 .../datastream/DataStreamWindowJoinRule.scala   |  94 +++++
 .../runtime/join/ProcTimeWindowInnerJoin.scala  | 326 +++++++++++++++++
 .../table/runtime/join/WindowJoinUtil.scala     | 349 +++++++++++++++++++
 .../table/updateutils/UpdateCheckUtils.scala    | 128 +++++++
 .../table/api/scala/stream/sql/JoinITCase.scala | 117 +++++++
 .../table/api/scala/stream/sql/JoinTest.scala   | 235 +++++++++++++
 .../table/runtime/harness/JoinHarnessTest.scala | 236 +++++++++++++
 .../KeyedTwoInputStreamOperatorTestHarness.java |   9 +
 14 files changed, 1788 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index c5a66b5..f026824 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -50,6 +50,7 @@ import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowIn
 import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
 import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.TypeCheckUtils
+import org.apache.flink.table.updateutils.UpdateCheckUtils
 import org.apache.flink.types.Row
 
 import _root_.scala.collection.JavaConverters._
@@ -173,10 +174,10 @@ abstract class StreamTableEnvironment(
         // optimize plan
         val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
         // check for append only table
-        val isAppendOnlyTable = isAppendOnly(optimizedPlan)
+        val isAppendOnlyTable = UpdateCheckUtils.isAppendOnly(optimizedPlan)
         upsertSink.setIsAppendOnly(isAppendOnlyTable)
         // extract unique key fields
-        val tableKeys: Option[Array[String]] = getUniqueKeyFields(optimizedPlan)
+        val tableKeys: Option[Array[String]] = UpdateCheckUtils.getUniqueKeyFields(optimizedPlan)
         // check that we have keys if the table has changes (is not append-only)
         tableKeys match {
           case Some(keys) => upsertSink.setKeyFields(keys)
@@ -200,7 +201,7 @@ abstract class StreamTableEnvironment(
         // optimize plan
         val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
         // verify table is an insert-only (append-only) table
-        if (!isAppendOnly(optimizedPlan)) {
+        if (!UpdateCheckUtils.isAppendOnly(optimizedPlan)) {
           throw new TableException(
             "AppendStreamTableSink requires that Table has only insert changes.")
         }
@@ -259,21 +260,6 @@ abstract class StreamTableEnvironment(
     }
   }
 
-  /** Validates that the plan produces only append changes. */
-  protected def isAppendOnly(plan: RelNode): Boolean = {
-    val appendOnlyValidator = new AppendOnlyValidator
-    appendOnlyValidator.go(plan)
-
-    appendOnlyValidator.isAppendOnly
-  }
-
-  /** Extracts the unique keys of the table produced by the plan. */
-  protected def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
-    val keyExtractor = new UniqueKeyExtractor
-    keyExtractor.go(plan)
-    keyExtractor.keys
-  }
-
   /**
     * Creates a converter that maps the internal CRow type to Scala or Java Tuple2 with change flag.
     *
@@ -665,7 +651,7 @@ abstract class StreamTableEnvironment(
       (implicit tpe: TypeInformation[A]): DataStream[A] = {
 
     // if no change flags are requested, verify table is an insert-only (append-only) table.
-    if (!withChangeFlag && !isAppendOnly(logicalPlan)) {
+    if (!withChangeFlag && !UpdateCheckUtils.isAppendOnly(logicalPlan)) {
       throw new TableException(
         "Table is not an append-only table. " +
         "Use the toRetractStream() in order to handle add and retract messages.")
@@ -749,90 +735,5 @@ abstract class StreamTableEnvironment(
         s"$sqlPlan"
   }
 
-  private class AppendOnlyValidator extends RelVisitor {
-
-    var isAppendOnly = true
-
-    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
-      node match {
-        case s: DataStreamRel if s.producesUpdates =>
-          isAppendOnly = false
-        case _ =>
-          super.visit(node, ordinal, parent)
-      }
-    }
-  }
-
-  /** Identifies unique key fields in the output of a RelNode. */
-  private class UniqueKeyExtractor extends RelVisitor {
-
-    var keys: Option[Array[String]] = None
-
-    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
-      node match {
-        case c: DataStreamCalc =>
-          super.visit(node, ordinal, parent)
-          // check if input has keys
-          if (keys.isDefined) {
-            // track keys forward
-            val inNames = c.getInput.getRowType.getFieldNames
-            val inOutNames = c.getProgram.getNamedProjects.asScala
-              .map(p => {
-                c.getProgram.expandLocalRef(p.left) match {
-                    // output field is forwarded input field
-                  case i: RexInputRef => (i.getIndex, p.right)
-                    // output field is renamed input field
-                  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
-                    a.getOperands.get(0) match {
-                      case ref: RexInputRef =>
-                        (ref.getIndex, p.right)
-                      case _ =>
-                        (-1, p.right)
-                    }
-                    // output field is not forwarded from input
-                  case _: RexNode => (-1, p.right)
-                }
-              })
-              // filter all non-forwarded fields
-              .filter(_._1 >= 0)
-              // resolve names of input fields
-              .map(io => (inNames.get(io._1), io._2))
-
-            // filter by input keys
-            val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
-            // check if all keys have been preserved
-            if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
-              // all key have been preserved (but possibly renamed)
-              keys = Some(outKeys.toArray)
-            } else {
-              // some (or all) keys have been removed. Keys are no longer unique and removed
-              keys = None
-            }
-          }
-        case _: DataStreamOverAggregate =>
-          super.visit(node, ordinal, parent)
-          // keys are always forwarded by Over aggregate
-        case a: DataStreamGroupAggregate =>
-          // get grouping keys
-          val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
-          keys = Some(groupKeys.toArray)
-        case w: DataStreamGroupWindowAggregate =>
-          // get grouping keys
-          val groupKeys =
-            w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
-          // get window start and end time
-          val windowStartEnd = w.getWindowProperties.map(_.name)
-          // we have only a unique key if at least one window property is selected
-          if (windowStartEnd.nonEmpty) {
-            keys = Some(groupKeys ++ windowStartEnd)
-          }
-        case _: DataStreamRel =>
-          // anything else does not forward keys or might duplicate key, so we can stop
-          keys = None
-      }
-    }
-
-  }
-
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 385cab2..32d6f01 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -165,8 +165,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
     LogicalProject.create(input, projects, fieldNames)
   }
 
-  override def visit(join: LogicalJoin): RelNode =
-    throw new TableException("Logical join in a stream environment is not supported yet.")
+  override def visit(join: LogicalJoin): RelNode = {
+    val left = join.getLeft.accept(this)
+    val right = join.getRight.accept(this)
+
+    LogicalJoin.create(left, right, join.getCondition, join.getVariablesSet, join.getJoinType)
+
+  }
+
 
   override def visit(correlate: LogicalCorrelate): RelNode = {
     // visit children and update inputs

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
new file mode 100644
index 0000000..7d0ca35
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.RelWriter
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.RexNode
+
+import scala.collection.JavaConverters._
+
+trait CommonJoin {
+
+  private[flink] def joinSelectionToString(inputType: RelDataType): String = {
+    inputType.getFieldNames.asScala.toList.mkString(", ")
+  }
+
+  private[flink] def joinConditionToString(
+      inputType: RelDataType,
+      joinCondition: RexNode,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+    val inFields = inputType.getFieldNames.asScala.toList
+    expression(joinCondition, inFields, None)
+  }
+
+  private[flink] def joinTypeToString(joinType: JoinRelType) = {
+    joinType match {
+      case JoinRelType.INNER => "InnerJoin"
+      case JoinRelType.LEFT=> "LeftOuterJoin"
+      case JoinRelType.RIGHT => "RightOuterJoin"
+      case JoinRelType.FULL => "FullOuterJoin"
+    }
+  }
+
+  private[flink] def joinToString(
+      inputType: RelDataType,
+      joinCondition: RexNode,
+      joinType: JoinRelType,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+    s"${joinTypeToString(joinType)}" +
+      s"(where: (${joinConditionToString(inputType, joinCondition, expression)}), " +
+      s"join: (${joinSelectionToString(inputType)}))"
+  }
+
+  private[flink] def joinExplainTerms(
+      pw: RelWriter,
+      inputType: RelDataType,
+      joinCondition: RexNode,
+      joinType: JoinRelType,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): RelWriter = {
+
+    pw.item("where", joinConditionToString(inputType, joinCondition, expression))
+      .item("join", joinSelectionToString(inputType))
+      .item("joinType", joinTypeToString(joinType))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index a6c31d3..e8f3b82 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -31,11 +31,11 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.FunctionCodeGenerator
+import org.apache.flink.table.plan.nodes.CommonJoin
 import org.apache.flink.table.runtime.FlatJoinRunner
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 /**
@@ -55,6 +55,7 @@ class DataSetJoin(
     joinHint: JoinHint,
     ruleDescription: String)
   extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
   with DataSetRel {
 
   override def deriveRowType() = rowRelDataType
@@ -76,14 +77,20 @@ class DataSetJoin(
   }
 
   override def toString: String = {
-    s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))"
+    joinToString(
+      joinRowType,
+      joinCondition,
+      joinType,
+      getExpressionString)
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
-      .item("where", joinConditionToString)
-      .item("join", joinSelectionToString)
-      .item("joinType", joinTypeToString)
+    joinExplainTerms(
+      super.explainTerms(pw),
+      joinRowType,
+      joinCondition,
+      joinType,
+      getExpressionString)
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
@@ -116,7 +123,8 @@ class DataSetJoin(
         "Joins should have at least one equality condition.\n" +
           s"\tLeft: ${left.toString},\n" +
           s"\tRight: ${right.toString},\n" +
-          s"\tCondition: ($joinConditionToString)"
+          s"\tCondition: (${joinConditionToString(joinRowType,
+            joinCondition, getExpressionString)})"
       )
     }
     else {
@@ -138,7 +146,8 @@ class DataSetJoin(
             "Equality join predicate on incompatible types.\n" +
               s"\tLeft: ${left.toString},\n" +
               s"\tRight: ${right.toString},\n" +
-              s"\tCondition: ($joinConditionToString)"
+              s"\tCondition: (${joinConditionToString(joinRowType,
+                joinCondition, getExpressionString)})"
           )
         }
       })
@@ -197,7 +206,9 @@ class DataSetJoin(
       genFunction.code,
       genFunction.returnType)
 
-    val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)"
+    val joinOpName =
+      s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
+        s"join: (${joinSelectionToString(joinRowType)})"
 
     joinOperator
       .where(leftKeys.toArray: _*)
@@ -205,22 +216,4 @@ class DataSetJoin(
       .`with`(joinFun)
       .name(joinOpName)
   }
-
-  private def joinSelectionToString: String = {
-    getRowType.getFieldNames.asScala.toList.mkString(", ")
-  }
-
-  private def joinConditionToString: String = {
-
-    val inFields = joinRowType.getFieldNames.asScala.toList
-    getExpressionString(joinCondition, inFields, None)
-  }
-
-  private def joinTypeToString = joinType match {
-    case JoinRelType.INNER => "InnerJoin"
-    case JoinRelType.LEFT=> "LeftOuterJoin"
-    case JoinRelType.RIGHT => "RightOuterJoin"
-    case JoinRelType.FULL => "FullOuterJoin"
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
new file mode 100644
index 0000000..1315a79
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related operations.
+  */
+class DataStreamWindowJoin(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    leftNode: RelNode,
+    rightNode: RelNode,
+    joinCondition: RexNode,
+    joinType: JoinRelType,
+    leftSchema: RowSchema,
+    rightSchema: RowSchema,
+    schema: RowSchema,
+    isRowTime: Boolean,
+    leftLowerBound: Long,
+    leftUpperBound: Long,
+    remainCondition: Option[RexNode],
+    ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+    with CommonJoin
+    with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamWindowJoin(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      joinCondition,
+      joinType,
+      leftSchema,
+      rightSchema,
+      schema,
+      isRowTime,
+      leftLowerBound,
+      leftUpperBound,
+      remainCondition,
+      ruleDescription)
+  }
+
+  override def toString: String = {
+    joinToString(
+      schema.logicalType,
+      joinCondition,
+      joinType,
+      getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    joinExplainTerms(
+      super.explainTerms(pw),
+      schema.logicalType,
+      joinCondition,
+      joinType,
+      getExpressionString)
+  }
+
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+    val config = tableEnv.getConfig
+
+    val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
+    val isRightAppendOnly = UpdateCheckUtils.isAppendOnly(right)
+    if (!isLeftAppendOnly || !isRightAppendOnly) {
+      throw new TableException(
+        "Windowed stream join does not support updates.")
+    }
+
+    val leftDataStream = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+    val rightDataStream = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+
+    // get the equality keys and other condition
+    val joinInfo = JoinInfo.of(leftNode, rightNode, joinCondition)
+    val leftKeys = joinInfo.leftKeys.toIntArray
+    val rightKeys = joinInfo.rightKeys.toIntArray
+
+    // generate join function
+    val joinFunction =
+    WindowJoinUtil.generateJoinFunction(
+      config,
+      joinType,
+      leftSchema.physicalTypeInfo,
+      rightSchema.physicalTypeInfo,
+      schema,
+      remainCondition,
+      ruleDescription)
+
+    joinType match {
+      case JoinRelType.INNER =>
+        isRowTime match {
+          case false =>
+            // Proctime JoinCoProcessFunction
+            createProcTimeInnerJoinFunction(
+              leftDataStream,
+              rightDataStream,
+              joinFunction.name,
+              joinFunction.code,
+              leftKeys,
+              rightKeys
+            )
+          case true =>
+            // RowTime JoinCoProcessFunction
+            throw new TableException(
+              "RowTime inner join between stream and stream is not supported yet.")
+        }
+      case JoinRelType.FULL =>
+        throw new TableException(
+          "Full join between stream and stream is not supported yet.")
+      case JoinRelType.LEFT =>
+        throw new TableException(
+          "Left join between stream and stream is not supported yet.")
+      case JoinRelType.RIGHT =>
+        throw new TableException(
+          "Right join between stream and stream is not supported yet.")
+    }
+  }
+
+  def createProcTimeInnerJoinFunction(
+      leftDataStream: DataStream[CRow],
+      rightDataStream: DataStream[CRow],
+      joinFunctionName: String,
+      joinFunctionCode: String,
+      leftKeys: Array[Int],
+      rightKeys: Array[Int]): DataStream[CRow] = {
+
+    val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
+
+    val procInnerJoinFunc = new ProcTimeWindowInnerJoin(
+      leftLowerBound,
+      leftUpperBound,
+      leftSchema.physicalTypeInfo,
+      rightSchema.physicalTypeInfo,
+      joinFunctionName,
+      joinFunctionCode)
+
+    if (!leftKeys.isEmpty) {
+      leftDataStream.connect(rightDataStream)
+        .keyBy(leftKeys, rightKeys)
+        .process(procInnerJoinFunc)
+        .returns(returnTypeInfo)
+    } else {
+      leftDataStream.connect(rightDataStream)
+        .keyBy(new NullByteKeySelector[CRow](), new NullByteKeySelector[CRow]())
+        .process(procInnerJoinFunc)
+        .setParallelism(1)
+        .setMaxParallelism(1)
+        .returns(returnTypeInfo)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index ebfbeb9..90bd624 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -191,6 +191,7 @@ object FlinkRuleSets {
     DataStreamUnionRule.INSTANCE,
     DataStreamValuesRule.INSTANCE,
     DataStreamCorrelateRule.INSTANCE,
+    DataStreamWindowJoinRule.INSTANCE,
     StreamTableSourceScanRule.INSTANCE
   )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
new file mode 100644
index 0000000..c7a190f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamWindowJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.WindowJoinUtil
+
+class DataStreamWindowJoinRule
+  extends ConverterRule(
+    classOf[FlinkLogicalJoin],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASTREAM,
+    "DataStreamJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
+    val joinInfo = join.analyzeCondition
+
+    try {
+      WindowJoinUtil.analyzeTimeBoundary(
+        joinInfo.getRemaining(join.getCluster.getRexBuilder),
+        join.getLeft.getRowType.getFieldCount,
+        new RowSchema(join.getRowType),
+        join.getCluster.getRexBuilder,
+        TableConfig.DEFAULT)
+      true
+    } catch {
+      case _: TableException =>
+        false
+    }
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+
+    val join: FlinkLogicalJoin = rel.asInstanceOf[FlinkLogicalJoin]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+    val convLeft: RelNode = RelOptRule.convert(join.getInput(0), FlinkConventions.DATASTREAM)
+    val convRight: RelNode = RelOptRule.convert(join.getInput(1), FlinkConventions.DATASTREAM)
+    val joinInfo = join.analyzeCondition
+    val leftRowSchema = new RowSchema(convLeft.getRowType)
+    val rightRowSchema = new RowSchema(convRight.getRowType)
+
+    val (isRowTime, leftLowerBoundary, leftUpperBoundary, remainCondition) =
+      WindowJoinUtil.analyzeTimeBoundary(
+        joinInfo.getRemaining(join.getCluster.getRexBuilder),
+        leftRowSchema.logicalArity,
+        new RowSchema(join.getRowType),
+        join.getCluster.getRexBuilder,
+        TableConfig.DEFAULT)
+
+    new DataStreamWindowJoin(
+      rel.getCluster,
+      traitSet,
+      convLeft,
+      convRight,
+      join.getCondition,
+      join.getJoinType,
+      leftRowSchema,
+      rightRowSchema,
+      new RowSchema(rel.getRowType),
+      isRowTime,
+      leftLowerBoundary,
+      leftUpperBoundary,
+      remainCondition,
+      description)
+  }
+}
+
+object DataStreamWindowJoinRule {
+  val INSTANCE: RelOptRule = new DataStreamWindowJoinRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
new file mode 100644
index 0000000..97d5ccc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just support inner-join
+  *
+  * @param leftLowerBound
+  *        the left stream lower bound, and -leftLowerBound is the right stream upper bound
+  * @param leftUpperBound
+  *        the left stream upper bound, and -leftUpperBound is the right stream lower bound
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param genJoinFuncName    the function code of other non-equi condition
+  * @param genJoinFuncCode    the function name of other non-equi condition
+  *
+  */
+class ProcTimeWindowInnerJoin(
+    private val leftLowerBound: Long,
+    private val leftUpperBound: Long,
+    private val element1Type: TypeInformation[Row],
+    private val element2Type: TypeInformation[Row],
+    private val genJoinFuncName: String,
+    private val genJoinFuncCode: String)
+  extends CoProcessFunction[CRow, CRow, CRow]
+    with Compiler[FlatJoinFunction[Row, Row, Row]]{
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  /** other condition function **/
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  /** tmp list to store expired records **/
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+  private val leftStreamWinSize: Long = if (leftLowerBound < 0) -leftLowerBound else 0
+  private val rightStreamWinSize: Long = if (leftUpperBound > 0) leftUpperBound else 0
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def open(config: Configuration) {
+    LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+      s"Code:\n$genJoinFuncCode")
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genJoinFuncName,
+      genJoinFuncCode)
+    LOG.debug("Instantiating JoinFunction.")
+    joinFunction = clazz.newInstance()
+
+    listToRemove = new util.ArrayList[Long]()
+    cRowWrapper = new CRowWrappingCollector()
+    cRowWrapper.setChange(true)
+
+    // initialize row state
+    val rowListTypeInfo1: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element1Type)
+    val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1)
+    row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+    val rowListTypeInfo2: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element2Type)
+    val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2)
+    row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+    // initialize timer state
+    val valueStateDescriptor1: ValueStateDescriptor[Long] =
+      new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+    timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
+
+    val valueStateDescriptor2: ValueStateDescriptor[Long] =
+      new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
+    timerState2 = getRuntimeContext.getState(valueStateDescriptor2)
+  }
+
+  /**
+    * Process left stream records
+    *
+    * @param valueC The input value.
+    * @param ctx   The ctx to register timer or get current time
+    * @param out   The collector for returning result values.
+    *
+    */
+  override def processElement1(
+      valueC: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    processElement(
+      valueC,
+      ctx,
+      out,
+      leftStreamWinSize,
+      timerState1,
+      row1MapState,
+      row2MapState,
+      -leftUpperBound,     // right stream lower
+      -leftLowerBound,     // right stream upper
+      true
+    )
+  }
+
+  /**
+    * Process right stream records
+    *
+    * @param valueC The input value.
+    * @param ctx   The ctx to register timer or get current time
+    * @param out   The collector for returning result values.
+    *
+    */
+  override def processElement2(
+      valueC: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    processElement(
+      valueC,
+      ctx,
+      out,
+      rightStreamWinSize,
+      timerState2,
+      row2MapState,
+      row1MapState,
+      leftLowerBound,    // left stream upper
+      leftUpperBound,    // left stream upper
+      false
+    )
+  }
+
+  /**
+    * Called when a processing timer trigger.
+    * Expire left/right records which earlier than current time - windowsize.
+    *
+    * @param timestamp The timestamp of the firing timer.
+    * @param ctx       The ctx to register timer or get current time
+    * @param out       The collector for returning result values.
+    */
+  override def onTimer(
+      timestamp: Long,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
+      out: Collector[CRow]): Unit = {
+
+    if (timerState1.value == timestamp) {
+      expireOutTimeRow(
+        timestamp,
+        leftStreamWinSize,
+        row1MapState,
+        timerState1,
+        ctx
+      )
+    }
+
+    if (timerState2.value == timestamp) {
+      expireOutTimeRow(
+        timestamp,
+        rightStreamWinSize,
+        row2MapState,
+        timerState2,
+        ctx
+      )
+    }
+  }
+
+  /**
+    * Puts an element from the input stream into state and search the other state to
+    * output records meet the condition, and registers a timer for the current record
+    * if there is no timer at present.
+    */
+  private def processElement(
+      valueC: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow],
+      winSize: Long,
+      timerState: ValueState[Long],
+      rowMapState: MapState[Long, JList[Row]],
+      oppoRowMapState: MapState[Long, JList[Row]],
+      oppoLowerBound: Long,
+      oppoUpperBound: Long,
+      isLeft: Boolean): Unit = {
+
+    cRowWrapper.out = out
+
+    val value = valueC.row
+
+    val curProcessTime = ctx.timerService.currentProcessingTime
+    val oppoLowerTime = curProcessTime + oppoLowerBound
+    val oppoUpperTime = curProcessTime + oppoUpperBound
+
+    // only when windowsize != 0, we need to store the element
+    if (winSize != 0) {
+      // register a timer to expire the element
+      if (timerState.value == 0) {
+        ctx.timerService.registerProcessingTimeTimer(curProcessTime + winSize + 1)
+        timerState.update(curProcessTime + winSize + 1)
+      }
+
+      var rowList = rowMapState.get(curProcessTime)
+      if (rowList == null) {
+        rowList = new util.ArrayList[Row]()
+      }
+      rowList.add(value)
+      rowMapState.put(curProcessTime, rowList)
+
+    }
+
+    // loop the other stream elements
+    val oppositeKeyIter = oppoRowMapState.keys().iterator()
+    while (oppositeKeyIter.hasNext) {
+      val eleTime = oppositeKeyIter.next()
+      if (eleTime < oppoLowerTime) {
+        listToRemove.add(eleTime)
+      } else if (eleTime <= oppoUpperTime) {
+        val oppoRowList = oppoRowMapState.get(eleTime)
+        var i = 0
+        if (isLeft) {
+          while (i < oppoRowList.size) {
+            joinFunction.join(value, oppoRowList.get(i), cRowWrapper)
+            i += 1
+          }
+        } else {
+          while (i < oppoRowList.size) {
+            joinFunction.join(oppoRowList.get(i), value, cRowWrapper)
+            i += 1
+          }
+        }
+      }
+    }
+
+    // expire records out-of-time
+    var i = listToRemove.size - 1
+    while (i >= 0) {
+      oppoRowMapState.remove(listToRemove.get(i))
+      i -= 1
+    }
+    listToRemove.clear()
+  }
+
+  /**
+    * Removes records which are outside the join window from the state.
+    * Registers a new timer if the state still holds records after the clean-up.
+    */
+  private def expireOutTimeRow(
+      curTime: Long,
+      winSize: Long,
+      rowMapState: MapState[Long, JList[Row]],
+      timerState: ValueState[Long],
+      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = {
+
+    val expiredTime = curTime - winSize
+    val keyIter = rowMapState.keys().iterator()
+    var nextTimer: Long = 0
+    // Search for expired timestamps.
+    // If we find a non-expired timestamp, remember the timestamp and leave the loop.
+    // This way we find all expired timestamps if they are sorted without doing a full pass.
+    while (keyIter.hasNext && nextTimer == 0) {
+      val recordTime = keyIter.next
+      if (recordTime < expiredTime) {
+        listToRemove.add(recordTime)
+      } else {
+        nextTimer = recordTime
+      }
+    }
+
+    // Remove expired records from state
+    var i = listToRemove.size - 1
+    while (i >= 0) {
+      rowMapState.remove(listToRemove.get(i))
+      i -= 1
+    }
+    listToRemove.clear()
+
+    // If the state has non-expired timestamps, register a new timer.
+    // Otherwise clean the complete state for this input.
+    if (nextTimer != 0) {
+      ctx.timerService.registerProcessingTimeTimer(nextTimer + winSize + 1)
+      timerState.update(nextTimer + winSize + 1)
+    } else {
+      timerState.clear()
+      rowMapState.clear()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
new file mode 100644
index 0000000..fabeeba
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.schema.{RowSchema, TimeIndicatorRelDataType}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * An util class to help analyze and build join code .
+  */
+object WindowJoinUtil {
+
+  /**
+    * Analyze time-condtion to get time boundary for each stream and get the time type
+    * and return remain condition.
+    *
+    * @param  condition           join condition
+    * @param  leftLogicalFieldCnt left stream logical field num
+    * @param  inputSchema         join rowtype schema
+    * @param  rexBuilder          util to build rexNode
+    * @param  config              table environment config
+    * @return isRowTime, left lower boundary, right lower boundary, remain condition
+    */
+  private[flink] def analyzeTimeBoundary(
+      condition: RexNode,
+      leftLogicalFieldCnt: Int,
+      inputSchema: RowSchema,
+      rexBuilder: RexBuilder,
+      config: TableConfig): (Boolean, Long, Long, Option[RexNode]) = {
+
+    // Converts the condition to conjunctive normal form (CNF)
+    val cnfCondition = RexUtil.toCnf(rexBuilder, condition)
+
+    // split the condition into time indicator condition and other condition
+    val (timeTerms, remainTerms) = cnfCondition match {
+      case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+        c.getOperands.asScala
+          .map(analyzeCondtionTermType(_, leftLogicalFieldCnt, inputSchema.logicalType))
+          .reduceLeft((l, r) => {
+            (l._1 ++ r._1, l._2 ++ r._2)
+          })
+      case _ =>
+        throw new TableException("A time-based stream join requires exactly " +
+          "two join predicates that bound the time in both directions.")
+    }
+
+    if (timeTerms.size != 2) {
+      throw new TableException("A time-based stream join requires exactly " +
+        "two join predicates that bound the time in both directions.")
+    }
+
+    // extract time offset from the time indicator conditon
+    val streamTimeOffsets =
+    timeTerms.map(x => extractTimeOffsetFromCondition(x._3, x._2, rexBuilder, config))
+
+    val (leftLowerBound, leftUpperBound) =
+      streamTimeOffsets match {
+        case Seq((x, true), (y, false)) => (x, y)
+        case Seq((x, false), (y, true)) => (y, x)
+        case _ =>
+          throw new TableException(
+            "Time-based join conditions must reference the time attribute of both input tables.")
+      }
+
+    // compose the remain condition list into one condition
+    val remainCondition =
+    remainTerms match {
+      case Seq() => None
+      case _ =>
+        // Converts logical field references to physical ones.
+        Some(remainTerms.map(inputSchema.mapRexNode).reduceLeft((l, r) => {
+          RelOptUtil.andJoinFilters(rexBuilder, l, r)
+        }))
+    }
+
+    val isRowTime: Boolean = timeTerms(0)._1 match {
+      case x if FlinkTypeFactory.isProctimeIndicatorType(x) => false
+      case _ => true
+    }
+    (isRowTime, leftLowerBound, leftUpperBound, remainCondition)
+  }
+
+  /**
+    * Split the join conditions into time condition and non-time condition
+    *
+    * @return (Seq(timeTerms), Seq(remainTerms)),
+    */
+  private def analyzeCondtionTermType(
+      conditionTerm: RexNode,
+      leftFieldCount: Int,
+      inputType: RelDataType): (Seq[(RelDataType, Boolean, RexNode)], Seq[RexNode]) = {
+
+    conditionTerm match {
+      case c: RexCall if Seq(SqlKind.GREATER_THAN, SqlKind.GREATER_THAN_OR_EQUAL,
+        SqlKind.LESS_THAN, SqlKind.LESS_THAN_OR_EQUAL).contains(c.getKind) =>
+        val timeIndicators = extractTimeIndicatorAccesses(c, leftFieldCount, inputType)
+        timeIndicators match {
+          case Seq() =>
+            (Seq(), Seq(c))
+          case Seq(v1, v2) =>
+            if (v1._1 != v2._1) {
+              throw new TableException(
+                "Both time attributes in a join condition must be of the same type.")
+            }
+            if (v1._2 == v2._2) {
+              throw new TableException("Time-based join conditions " +
+                "must reference the time attribute of both input tables.")
+            }
+            (Seq((v1._1, v1._2, c)), Seq())
+          case _ =>
+            throw new TableException(
+              "Time-based join conditions must reference the time attribute of both input tables.")
+        }
+      case other =>
+        val timeIndicators = extractTimeIndicatorAccesses(other, leftFieldCount, inputType)
+        timeIndicators match {
+          case Seq() =>
+            (Seq(), Seq(other))
+          case _ =>
+            throw new TableException("Time indicators can not be used in non time-condition.")
+        }
+    }
+  }
+
+  /**
+    * Extracts all time indicator attributes that are accessed in an expression.
+    *
+    * @return seq(timeType, is left input time indicator)
+    */
+  def extractTimeIndicatorAccesses(
+      expression: RexNode,
+      leftFieldCount: Int,
+      inputType: RelDataType): Seq[(RelDataType, Boolean)] = {
+
+    expression match {
+      case i: RexInputRef =>
+        val idx = i.getIndex
+        inputType.getFieldList.get(idx).getType match {
+          case t: TimeIndicatorRelDataType if idx < leftFieldCount =>
+            // left table time indicator
+            Seq((t, true))
+          case t: TimeIndicatorRelDataType =>
+            // right table time indicator
+            Seq((t, false))
+          case _ => Seq()
+        }
+      case c: RexCall =>
+        c.operands.asScala
+          .map(extractTimeIndicatorAccesses(_, leftFieldCount, inputType))
+          .reduce(_ ++ _)
+      case _ => Seq()
+    }
+  }
+
+  /**
+    * Computes the absolute bound on the left operand of a comparison expression and
+    * whether the bound is an upper or lower bound.
+    *
+    * @return window boundary, is left lower bound
+    */
+  def extractTimeOffsetFromCondition(
+      timeTerm: RexNode,
+      isLeftExprBelongLeftTable: Boolean,
+      rexBuilder: RexBuilder,
+      config: TableConfig): (Long, Boolean) = {
+
+    val timeCall: RexCall = timeTerm.asInstanceOf[RexCall]
+
+    val isLeftLowerBound: Boolean =
+      timeTerm.getKind match {
+        // e.g a.proctime > b.proctime - 5 sec, then it's the lower bound of a and the value is -5
+        // e.g b.proctime > a.proctime - 5 sec, then it's not the lower bound of a but upper bound
+        case kind@(SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL) =>
+          isLeftExprBelongLeftTable
+        // e.g a.proctime < b.proctime + 5 sec, the the upper bound of a is 5
+        case kind@(SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
+          !isLeftExprBelongLeftTable
+        case _ =>
+          throw new TableException("Unsupported time-condition.")
+      }
+
+    val (leftLiteral, rightLiteral) =
+      reduceTimeExpression(
+        timeCall.operands.get(0),
+        timeCall.operands.get(1),
+        rexBuilder,
+        config)
+    val tmpTimeOffset: Long =
+      if (isLeftExprBelongLeftTable) rightLiteral - leftLiteral else leftLiteral - rightLiteral
+
+    val boundary =
+      tmpTimeOffset.signum * (
+        if (timeTerm.getKind == SqlKind.LESS_THAN || timeTerm.getKind == SqlKind.GREATER_THAN) {
+          tmpTimeOffset.abs - 1
+        } else {
+          tmpTimeOffset.abs
+        })
+
+    (boundary, isLeftLowerBound)
+  }
+
+  /**
+    * Calculates the time boundary by replacing the time attribute by a zero literal
+    * and reducing the expression.
+    * For example:
+    * b.proctime - interval '1' second - interval '2' second will be translated to
+    * 0 - 1000 - 2000
+    */
+  private def reduceTimeExpression(
+      leftRexNode: RexNode,
+      rightRexNode: RexNode,
+      rexBuilder: RexBuilder,
+      config: TableConfig): (Long, Long) = {
+
+    /**
+      * replace the rowtime/proctime with zero literal.
+      */
+    def replaceTimeFieldWithLiteral(expr: RexNode): RexNode = {
+      expr match {
+        case c: RexCall =>
+          // replace in call operands
+          val newOps = c.operands.asScala.map(replaceTimeFieldWithLiteral(_)).asJava
+          rexBuilder.makeCall(c.getType, c.getOperator, newOps)
+        case i: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(i.getType) =>
+          // replace with timestamp
+          rexBuilder.makeZeroLiteral(expr.getType)
+        case _: RexInputRef =>
+          throw new TableException("Time join condition may only reference time indicator fields.")
+        case _ => expr
+      }
+    }
+
+    val literalLeftRex = replaceTimeFieldWithLiteral(leftRexNode)
+    val literalRightRex = replaceTimeFieldWithLiteral(rightRexNode)
+
+    val exprReducer = new ExpressionReducer(config)
+    val originList = new util.ArrayList[RexNode]()
+    originList.add(literalLeftRex)
+    originList.add(literalRightRex)
+    val reduceList = new util.ArrayList[RexNode]()
+    exprReducer.reduce(rexBuilder, originList, reduceList)
+
+    val literals = reduceList.asScala.map(f => f match {
+      case literal: RexLiteral =>
+        literal.getValue2.asInstanceOf[Long]
+      case _ =>
+        throw TableException(
+          "Time condition may only consist of time attributes, literals, and arithmetic operators.")
+    })
+
+    (literals(0), literals(1))
+  }
+
+
+  /**
+    * Generate other non-equi condition function
+    *
+    * @param  config          table env config
+    * @param  joinType        join type to determain whether input can be null
+    * @param  leftType        left stream type
+    * @param  rightType       right stream type
+    * @param  returnType      return type
+    * @param  otherCondition  non-equi condition
+    * @param  ruleDescription rule description
+    */
+  private[flink] def generateJoinFunction(
+      config: TableConfig,
+      joinType: JoinRelType,
+      leftType: TypeInformation[Row],
+      rightType: TypeInformation[Row],
+      returnType: RowSchema,
+      otherCondition: Option[RexNode],
+      ruleDescription: String) = {
+
+    // whether input can be null
+    val nullCheck = joinType match {
+      case JoinRelType.INNER => false
+      case JoinRelType.LEFT => true
+      case JoinRelType.RIGHT => true
+      case JoinRelType.FULL => true
+    }
+
+    // generate other non-equi function code
+    val generator = new CodeGenerator(
+      config,
+      nullCheck,
+      leftType,
+      Some(rightType))
+
+    val conversion = generator.generateConverterResultExpression(
+      returnType.physicalTypeInfo,
+      returnType.physicalType.getFieldNames.asScala)
+
+    // if other condition is none, then output the result directly
+    val body = otherCondition match {
+      case None =>
+        s"""
+           |${conversion.code}
+           |${generator.collectorTerm}.collect(${conversion.resultTerm});
+           |""".stripMargin
+      case Some(remainCondition) =>
+        val genCond = generator.generateExpression(remainCondition)
+        s"""
+           |${genCond.code}
+           |if (${genCond.resultTerm}) {
+           |  ${conversion.code}
+           |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
+           |}
+           |""".stripMargin
+    }
+
+    generator.generateFunction(
+      ruleDescription,
+      classOf[FlatJoinFunction[Row, Row, Row]],
+      body,
+      returnType.physicalTypeInfo)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
new file mode 100644
index 0000000..769ba55
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.updateutils
+
+import org.apache.calcite.rel.{RelNode, RelVisitor}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.datastream._
+import _root_.scala.collection.JavaConverters._
+
+object UpdateCheckUtils {
+
+  /** Validates that the plan produces only append changes. */
+  def isAppendOnly(plan: RelNode): Boolean = {
+    val appendOnlyValidator = new AppendOnlyValidator
+    appendOnlyValidator.go(plan)
+
+    appendOnlyValidator.isAppendOnly
+  }
+
+  /** Extracts the unique keys of the table produced by the plan. */
+  def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
+    val keyExtractor = new UniqueKeyExtractor
+    keyExtractor.go(plan)
+    keyExtractor.keys
+  }
+
+  private class AppendOnlyValidator extends RelVisitor {
+
+    var isAppendOnly = true
+
+    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+      node match {
+        case s: DataStreamRel if s.producesUpdates =>
+          isAppendOnly = false
+        case _ =>
+          super.visit(node, ordinal, parent)
+      }
+    }
+  }
+
+  /** Identifies unique key fields in the output of a RelNode. */
+  private class UniqueKeyExtractor extends RelVisitor {
+
+    var keys: Option[Array[String]] = None
+
+    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+      node match {
+        case c: DataStreamCalc =>
+          super.visit(node, ordinal, parent)
+          // check if input has keys
+          if (keys.isDefined) {
+            // track keys forward
+            val inNames = c.getInput.getRowType.getFieldNames
+            val inOutNames = c.getProgram.getNamedProjects.asScala
+              .map(p => {
+                c.getProgram.expandLocalRef(p.left) match {
+                  // output field is forwarded input field
+                  case i: RexInputRef => (i.getIndex, p.right)
+                  // output field is renamed input field
+                  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+                    a.getOperands.get(0) match {
+                      case ref: RexInputRef =>
+                        (ref.getIndex, p.right)
+                      case _ =>
+                        (-1, p.right)
+                    }
+                  // output field is not forwarded from input
+                  case _: RexNode => (-1, p.right)
+                }
+              })
+              // filter all non-forwarded fields
+              .filter(_._1 >= 0)
+              // resolve names of input fields
+              .map(io => (inNames.get(io._1), io._2))
+
+            // filter by input keys
+            val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
+            // check if all keys have been preserved
+            if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
+              // all key have been preserved (but possibly renamed)
+              keys = Some(outKeys.toArray)
+            } else {
+              // some (or all) keys have been removed. Keys are no longer unique and removed
+              keys = None
+            }
+          }
+        case _: DataStreamOverAggregate =>
+          super.visit(node, ordinal, parent)
+        // keys are always forwarded by Over aggregate
+        case a: DataStreamGroupAggregate =>
+          // get grouping keys
+          val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
+          keys = Some(groupKeys.toArray)
+        case w: DataStreamGroupWindowAggregate =>
+          // get grouping keys
+          val groupKeys =
+            w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
+          // get window start and end time
+          val windowStartEnd = w.getWindowProperties.map(_.name)
+          // we have only a unique key if at least one window property is selected
+          if (windowStartEnd.nonEmpty) {
+            keys = Some(groupKeys ++ windowStartEnd)
+          }
+        case _: DataStreamRel =>
+          // anything else does not forward keys or might duplicate key, so we can stop
+          keys = None
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
new file mode 100644
index 0000000..d4ff3f7
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+  val data = List(
+    (1L, 1, "Hello"),
+    (2L, 2, "Hello"),
+    (3L, 3, "Hello"),
+    (4L, 4, "Hello"),
+    (5L, 5, "Hello"),
+    (6L, 6, "Hello"),
+    (7L, 7, "Hello World"),
+    (8L, 8, "Hello World"),
+    (20L, 20, "Hello World"))
+
+  /** test process time inner join **/
+  @Test
+  def testProcessTimeInnerJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
+      "t1.proctime between t2.proctime - interval '5' second and t2.proctime + interval '5' second"
+
+    val data1 = new mutable.MutableList[(Int, Long, String)]
+    data1.+=((1, 1L, "Hi1"))
+    data1.+=((1, 2L, "Hi2"))
+    data1.+=((1, 5L, "Hi3"))
+    data1.+=((2, 7L, "Hi5"))
+    data1.+=((1, 9L, "Hi6"))
+    data1.+=((1, 8L, "Hi8"))
+
+    val data2 = new mutable.MutableList[(Int, Long, String)]
+    data2.+=((1, 1L, "HiHi"))
+    data2.+=((2, 2L, "HeHe"))
+
+    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+  }
+
+  /** test process time inner join with other condition **/
+  @Test
+  def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
+      "t1.proctime between t2.proctime - interval '5' second " +
+      "and t2.proctime + interval '5' second " +
+      "and t1.b > t2.b and t1.b + t2.b < 14"
+
+    val data1 = new mutable.MutableList[(String, Long, String)]
+    data1.+=(("1", 1L, "Hi1"))
+    data1.+=(("1", 2L, "Hi2"))
+    data1.+=(("1", 5L, "Hi3"))
+    data1.+=(("2", 7L, "Hi5"))
+    data1.+=(("1", 9L, "Hi6"))
+    data1.+=(("1", 8L, "Hi8"))
+
+    val data2 = new mutable.MutableList[(String, Long, String)]
+    data2.+=(("1", 5L, "HiHi"))
+    data2.+=(("2", 2L, "HeHe"))
+
+    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
new file mode 100644
index 0000000..15e8b89
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.WindowJoinUtil
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Assert._
+import org.junit.Test
+
+class JoinTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+  streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+
+  @Test
+  def testProcessingTimeInnerJoin() = {
+
+    val sqlQuery = "SELECT t1.a, t2.b " +
+      "FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " +
+      "t1.proctime between t2.proctime - interval '1' hour and t2.proctime + interval '1' hour"
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "proctime")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "proctime")
+          ),
+          term("where",
+            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+              "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+          term("join", "a, proctime, a0, b, proctime0"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  /** There should exist time conditions **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinUnExistTimeCondition() = {
+    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a"
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** There should exist exactly two time conditions **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinSingleTimeCondition() = {
+    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
+      " and t1.proctime > t2.proctime - interval '5' second"
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** Both time attributes in a join condition must be of the same type **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinDiffTimeIndicator() = {
+    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
+      " and t1.proctime > t2.proctime - interval '5' second " +
+      " and t1.proctime < t2.c + interval '5' second"
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** The time conditions should be an And condition **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinNotCnfCondition() = {
+    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
+      " and (t1.proctime > t2.proctime - interval '5' second " +
+      " or t1.proctime < t2.c + interval '5' second)"
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  @Test
+  def testJoinTimeBoundary(): Unit = {
+    verifyTimeBoundary(
+      "t1.proctime between t2.proctime - interval '1' hour " +
+        "and t2.proctime + interval '1' hour",
+      -3600000,
+      3600000,
+      "proctime")
+
+    verifyTimeBoundary(
+      "t1.proctime > t2.proctime - interval '1' second and " +
+        "t1.proctime < t2.proctime + interval '1' second",
+      -999,
+      999,
+      "proctime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c - interval '1' second and " +
+        "t1.c <= t2.c + interval '1' second",
+      -1000,
+      1000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c and " +
+        "t1.c <= t2.c + interval '1' second",
+      0,
+      1000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c + interval '1' second and " +
+        "t1.c <= t2.c + interval '10' second",
+      1000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t2.c - interval '1' second <= t1.c and " +
+        "t2.c + interval '10' second >= t1.c",
+      -1000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c - interval '2' second >= t2.c + interval '1' second -" +
+        "interval '10' second and " +
+        "t1.c <= t2.c + interval '10' second",
+      -7000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c - interval '10' second and " +
+        "t1.c <= t2.c - interval '5' second",
+      -10000,
+      -5000,
+      "rowtime")
+  }
+
+  @Test
+  def testJoinRemainConditionConvert(): Unit = {
+    streamUtil.addTable[(Int, Long, Int)]("MyTable3", 'a, 'b.rowtime, 'c, 'proctime.proctime)
+    streamUtil.addTable[(Int, Long, Int)]("MyTable4", 'a, 'b.rowtime, 'c, 'proctime.proctime)
+    val query =
+      "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
+        "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second and " +
+        "t1.c > t2.c"
+    verifyRemainConditionConvert(
+      query,
+      ">($1, $3)")
+
+    val query1 =
+      "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
+        "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second "
+    verifyRemainConditionConvert(
+      query1,
+      "")
+
+    streamUtil.addTable[(Int, Long, Int)]("MyTable5", 'a, 'b, 'c, 'proctime.proctime)
+    streamUtil.addTable[(Int, Long, Int)]("MyTable6", 'a, 'b, 'c, 'proctime.proctime)
+    val query2 =
+      "SELECT t1.a, t2.c FROM MyTable5 as t1 join MyTable6 as t2 on t1.a = t2.a and " +
+        "t1.proctime >= t2.proctime - interval '10' second " +
+        "and t1.proctime <= t2.proctime - interval '5' second and " +
+        "t1.c > t2.c"
+    verifyRemainConditionConvert(
+      query2,
+      ">($2, $5)")
+  }
+
+  def verifyTimeBoundary(
+      timeSql: String,
+      expLeftSize: Long,
+      expRightSize: Long,
+      expTimeType: String) = {
+    val query =
+      "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
+
+    val resultTable = streamUtil.tEnv.sql(query)
+    val relNode = resultTable.getRelNode
+    val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+    val rexNode = joinNode.getCondition
+    val (isRowTime, lowerBound, upperBound, conditionWithoutTime) =
+      WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType),
+        joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
+
+    val timeTypeStr =
+      if (isRowTime) "rowtime"
+      else  "proctime"
+    assertEquals(expLeftSize, lowerBound)
+    assertEquals(expRightSize, upperBound)
+    assertEquals(expTimeType, timeTypeStr)
+  }
+
+  def verifyRemainConditionConvert(
+      query: String,
+      expectCondStr: String) = {
+
+    val resultTable = streamUtil.tEnv.sql(query)
+    val relNode = resultTable.getRelNode
+    val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+    val joinInfo = joinNode.analyzeCondition
+    val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
+    val (isRowTime, lowerBound, upperBound, remainCondition) =
+      WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType),
+        joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
+
+    val actual: String = remainCondition.getOrElse("").toString
+
+    assertEquals(expectCondStr, actual)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
new file mode 100644
index 0000000..c008ed3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.lang.{Integer => JInt}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, TwoInputStreamOperatorTestHarness}
+import org.apache.flink.table.codegen.GeneratedFunction
+import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, TupleRowKeySelector}
+import org.apache.flink.table.runtime.join.ProcTimeWindowInnerJoin
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.junit.Test
+
+
+class JoinHarnessTest extends HarnessTestBase{
+
+  private val rT = new RowTypeInfo(Array[TypeInformation[_]](
+    INT_TYPE_INFO,
+    STRING_TYPE_INFO),
+    Array("a", "b"))
+
+
+  val funcCode: String =
+    """
+      |public class TestJoinFunction
+      |          extends org.apache.flink.api.common.functions.RichFlatJoinFunction {
+      |  transient org.apache.flink.types.Row out =
+      |            new org.apache.flink.types.Row(4);
+      |  public TestJoinFunction() throws Exception {}
+      |
+      |  @Override
+      |  public void open(org.apache.flink.configuration.Configuration parameters)
+      |  throws Exception {}
+      |
+      |  @Override
+      |  public void join(Object _in1, Object _in2, org.apache.flink.util.Collector c)
+      |   throws Exception {
+      |   org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1;
+      |   org.apache.flink.types.Row in2 = (org.apache.flink.types.Row) _in2;
+      |
+      |   out.setField(0, in1.getField(0));
+      |   out.setField(1, in1.getField(1));
+      |   out.setField(2, in2.getField(0));
+      |   out.setField(3, in2.getField(1));
+      |
+      |   c.collect(out);
+      |
+      |  }
+      |
+      |  @Override
+      |  public void close() throws Exception {}
+      |}
+    """.stripMargin
+
+  /** a.proctime >= b.proctime - 10 and a.proctime <= b.proctime + 20 **/
+  @Test
+  def testNormalProcTimeJoin() {
+
+    val joinProcessFunc = new ProcTimeWindowInnerJoin(-10, 20, rT, rT, "TestJoinFunction", funcCode)
+
+    val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] =
+      new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc)
+    val testHarness: KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow] =
+      new KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow](
+       operator,
+       new TupleRowKeySelector[Integer](0),
+       new TupleRowKeySelector[Integer](0),
+       BasicTypeInfo.INT_TYPE_INFO,
+       1,1,0)
+
+    testHarness.open()
+
+    // left stream input
+    testHarness.setProcessingTime(1)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa"), true), 1))
+    assert(testHarness.numProcessingTimeTimers() == 1)
+    testHarness.setProcessingTime(2)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb"), true), 2))
+    assert(testHarness.numProcessingTimeTimers() == 2)
+    testHarness.setProcessingTime(3)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa2"), true), 3))
+    assert(testHarness.numKeyedStateEntries() == 4)
+    assert(testHarness.numProcessingTimeTimers() == 2)
+
+    // right stream input and output normally
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi1"), true), 3))
+    testHarness.setProcessingTime(4)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "Hello1"), true), 4))
+    assert(testHarness.numKeyedStateEntries() == 8)
+    assert(testHarness.numProcessingTimeTimers() == 4)
+
+    // expired left stream record at timestamp 1
+    testHarness.setProcessingTime(12)
+    assert(testHarness.numKeyedStateEntries() == 8)
+    assert(testHarness.numProcessingTimeTimers() == 4)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi2"), true), 12))
+
+    // expired right stream record at timestamp 4 and all left stream
+    testHarness.setProcessingTime(25)
+    assert(testHarness.numKeyedStateEntries() == 2)
+    assert(testHarness.numProcessingTimeTimers() == 1)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa3"), true), 25))
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb2"), true), 25))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "Hello2"), true), 25))
+
+    testHarness.setProcessingTime(45)
+    assert(testHarness.numKeyedStateEntries() > 0)
+    testHarness.setProcessingTime(46)
+    assert(testHarness.numKeyedStateEntries() == 0)
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), true), 3))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa2", 1: JInt, "Hi1"), true), 3))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb", 2: JInt, "Hello1"), true), 4))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa2", 1: JInt, "Hi2"), true), 12))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa3", 1: JInt, "Hi2"), true), 25))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb2", 2: JInt, "Hello2"), true), 25))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+
+    testHarness.close()
+  }
+
+  /** a.proctime >= b.proctime - 10 and a.proctime <= b.proctime - 5 **/
+  @Test
+  def testProcTimeJoinSingleNeedStore() {
+
+    val joinProcessFunc = new ProcTimeWindowInnerJoin(-10, -5, rT, rT, "TestJoinFunction", funcCode)
+
+    val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] =
+      new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc)
+    val testHarness: KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow] =
+      new KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow](
+        operator,
+        new TupleRowKeySelector[Integer](0),
+        new TupleRowKeySelector[Integer](0),
+        BasicTypeInfo.INT_TYPE_INFO,
+        1,1,0)
+
+    testHarness.open()
+
+    testHarness.setProcessingTime(1)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa1"), true), 1))
+    testHarness.setProcessingTime(2)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa2"), true), 2))
+    testHarness.setProcessingTime(3)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa3"), true), 3))
+    assert(testHarness.numKeyedStateEntries() == 4)
+    assert(testHarness.numProcessingTimeTimers() == 2)
+
+    // Do not store b elements
+    // not meet a.proctime <= b.proctime - 5
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb3"), true), 3))
+    assert(testHarness.numKeyedStateEntries() == 4)
+    assert(testHarness.numProcessingTimeTimers() == 2)
+
+    // meet a.proctime <= b.proctime - 5
+    testHarness.setProcessingTime(7)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb7"), true), 7))
+    assert(testHarness.numKeyedStateEntries() == 4)
+    assert(testHarness.numProcessingTimeTimers() == 2)
+
+    // expire record of stream a at timestamp 1
+    testHarness.setProcessingTime(12)
+    assert(testHarness.numKeyedStateEntries() == 4)
+    assert(testHarness.numProcessingTimeTimers() == 2)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb12"), true), 12))
+
+    testHarness.setProcessingTime(13)
+    assert(testHarness.numKeyedStateEntries() == 2)
+    assert(testHarness.numProcessingTimeTimers() == 1)
+
+    testHarness.setProcessingTime(14)
+    assert(testHarness.numKeyedStateEntries() == 0)
+    assert(testHarness.numProcessingTimeTimers() == 0)
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+
+    testHarness.close()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 10c79d0..b0500ca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 
@@ -123,4 +124,12 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
 
 		super.initializeState(operatorStateHandles);
 	}
+
+	public int numKeyedStateEntries() {
+		if (keyedStateBackend instanceof HeapKeyedStateBackend) {
+			return ((HeapKeyedStateBackend) keyedStateBackend).numStateEntries();
+		} else {
+			throw new UnsupportedOperationException();
+		}
+	}
 }


[3/3] flink git commit: [FLINK-6232] [table] Add SQL documentation for time window join.

Posted by fh...@apache.org.
[FLINK-6232] [table] Add SQL documentation for time window join.

- Add support for window join predicates in WHERE clause.
- Refactoring of WindowJoinUtil.
- Minor refactorings of join classes.

This closes #4324.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/471345c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/471345c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/471345c0

Branch: refs/heads/master
Commit: 471345c0ea9930a582786cc08bd290d374b28c5a
Parents: ba6c59e
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Jul 13 00:49:30 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Jul 18 00:57:37 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |  29 ++
 .../table/api/StreamTableEnvironment.scala      |  10 +-
 .../calcite/RelTimeIndicatorConverter.scala     | 166 ++++----
 .../flink/table/plan/nodes/CommonCalc.scala     |  25 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  12 +-
 .../table/plan/nodes/dataset/DataSetJoin.scala  |   2 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |  25 +-
 .../nodes/datastream/DataStreamWindowJoin.scala |  40 +-
 .../datastream/DataStreamWindowJoinRule.scala   |  55 ++-
 .../table/plan/util/UpdatingPlanChecker.scala   | 129 ++++++
 .../runtime/join/ProcTimeWindowInnerJoin.scala  | 134 +++---
 .../table/runtime/join/WindowJoinUtil.scala     | 408 ++++++++++++-------
 .../table/updateutils/UpdateCheckUtils.scala    | 128 ------
 .../table/api/scala/stream/sql/JoinITCase.scala | 117 ------
 .../table/api/scala/stream/sql/JoinTest.scala   | 235 -----------
 .../flink/table/api/stream/sql/JoinTest.scala   | 250 ++++++++++++
 .../sql/validation/JoinValidationTest.scala     |  95 +++++
 .../plan/TimeIndicatorConversionTest.scala      |   2 +-
 .../table/runtime/harness/JoinHarnessTest.scala |  61 ++-
 .../table/runtime/stream/sql/JoinITCase.scala   | 106 +++++
 20 files changed, 1167 insertions(+), 862 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 81aa3d9..e677b60 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -369,6 +369,35 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
       </td>
     </tr>
     <tr>
+      <td><strong>Time-windowed Join</strong><br>
+        <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
+
+        <p>A time-windowed join requires a special join condition that bounds the time on both sides. This can be done by either two appropriate range predicates (<code> &lt;, &lt;=, &gt;=, &gt;</code>) or a <code>BETWEEN</code> predicate that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+          <ul>
+            <li>Time predicates must compare time attributes of both input tables.</li>
+            <li>Time predicates must compare only time attributes of the same type, i.e., processing time with processing time or event time with event time.</li>
+            <li>Only range predicates are valid time predicates.</li>
+            <li>Non-time predicates must not access a time attribute.</li>
+          </ul>
+        </p>
+
+        <p><b>Note:</b> Currently, only processing time window joins and <code>INNER</code> joins are supported.</p>
+
+{% highlight sql %}
+SELECT *
+FROM Orders o, Shipments s
+WHERE o.id = s.orderId AND
+      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
+{% endhighlight %}
+
+The example above will join all orders with their corresponding shipments if the order was shipped four hours after the order was received.
+      </td>
+    </tr>
+    <tr>
     	<td>
         <strong>Expanding arrays into a relation</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index f026824..669b017 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -45,12 +45,12 @@ import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait, _}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable}
+import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner}
 import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
 import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.TypeCheckUtils
-import org.apache.flink.table.updateutils.UpdateCheckUtils
 import org.apache.flink.types.Row
 
 import _root_.scala.collection.JavaConverters._
@@ -174,10 +174,10 @@ abstract class StreamTableEnvironment(
         // optimize plan
         val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
         // check for append only table
-        val isAppendOnlyTable = UpdateCheckUtils.isAppendOnly(optimizedPlan)
+        val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(optimizedPlan)
         upsertSink.setIsAppendOnly(isAppendOnlyTable)
         // extract unique key fields
-        val tableKeys: Option[Array[String]] = UpdateCheckUtils.getUniqueKeyFields(optimizedPlan)
+        val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan)
         // check that we have keys if the table has changes (is not append-only)
         tableKeys match {
           case Some(keys) => upsertSink.setKeyFields(keys)
@@ -201,7 +201,7 @@ abstract class StreamTableEnvironment(
         // optimize plan
         val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
         // verify table is an insert-only (append-only) table
-        if (!UpdateCheckUtils.isAppendOnly(optimizedPlan)) {
+        if (!UpdatingPlanChecker.isAppendOnly(optimizedPlan)) {
           throw new TableException(
             "AppendStreamTableSink requires that Table has only insert changes.")
         }
@@ -651,7 +651,7 @@ abstract class StreamTableEnvironment(
       (implicit tpe: TypeInformation[A]): DataStream[A] = {
 
     // if no change flags are requested, verify table is an insert-only (append-only) table.
-    if (!withChangeFlag && !UpdateCheckUtils.isAppendOnly(logicalPlan)) {
+    if (!withChangeFlag && !UpdatingPlanChecker.isAppendOnly(logicalPlan)) {
       throw new TableException(
         "Table is not an append-only table. " +
         "Use the toRetractStream() in order to handle add and retract messages.")

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 32d6f01..d76613e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -42,9 +42,9 @@ import scala.collection.mutable
 class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
 
   private val timestamp = rexBuilder
-      .getTypeFactory
-      .asInstanceOf[FlinkTypeFactory]
-      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+    .getTypeFactory
+    .asInstanceOf[FlinkTypeFactory]
+    .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
 
   override def visit(intersect: LogicalIntersect): RelNode =
     throw new TableException("Logical intersect in a stream environment is not supported yet.")
@@ -138,15 +138,10 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
     // visit children and update inputs
     val input = filter.getInput.accept(this)
 
-    // check if input field contains time indicator type
-    // materialize field if no time indicator is present anymore
-    // if input field is already materialized, change to timestamp type
-    val materializer = new RexTimeIndicatorMaterializer(
-      rexBuilder,
-      input.getRowType.getFieldList.map(_.getType))
-
-    val condition = filter.getCondition.accept(materializer)
-    LogicalFilter.create(input, condition)
+    // We do not materialize time indicators in conditions because they can be locally evaluated.
+    // Some conditions are evaluated by special operators (e.g., time window joins).
+    // Time indicators in remaining conditions are materialized by Calc before the code generation.
+    LogicalFilter.create(input, filter.getCondition)
   }
 
   override def visit(project: LogicalProject): RelNode = {
@@ -306,68 +301,6 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
       updatedAggCalls)
   }
 
-  class RexTimeIndicatorMaterializer(
-      private val rexBuilder: RexBuilder,
-      private val input: Seq[RelDataType])
-    extends RexShuttle {
-
-    override def visitInputRef(inputRef: RexInputRef): RexNode = {
-      // reference is interesting
-      if (isTimeIndicatorType(inputRef.getType)) {
-        val resolvedRefType = input(inputRef.getIndex)
-        // input is a valid time indicator
-        if (isTimeIndicatorType(resolvedRefType)) {
-          inputRef
-        }
-        // input has been materialized
-        else {
-          new RexInputRef(inputRef.getIndex, resolvedRefType)
-        }
-      }
-      // reference is a regular field
-      else {
-        super.visitInputRef(inputRef)
-      }
-    }
-
-    override def visitCall(call: RexCall): RexNode = {
-      val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
-
-      // materialize operands with time indicators
-      val materializedOperands = updatedCall.getOperator match {
-
-        // skip materialization for special operators
-        case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
-          updatedCall.getOperands.toList
-
-        case _ =>
-          updatedCall.getOperands.map { o =>
-            if (isTimeIndicatorType(o.getType)) {
-              rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
-            } else {
-              o
-            }
-          }
-      }
-
-      // remove time indicator return type
-      updatedCall.getOperator match {
-
-        // we do not modify AS if operand has not been materialized
-        case SqlStdOperatorTable.AS if
-            isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
-          updatedCall
-
-        // materialize function's result and operands
-        case _ if isTimeIndicatorType(updatedCall.getType) =>
-          updatedCall.clone(timestamp, materializedOperands)
-
-        // materialize function's operands only
-        case _ =>
-          updatedCall.clone(updatedCall.getType, materializedOperands)
-      }
-    }
-  }
 }
 
 object RelTimeIndicatorConverter {
@@ -421,4 +354,89 @@ object RelTimeIndicatorConverter {
 
     new RelRecordType(fields)
   }
+
+  /**
+    * Materializes time indicator accesses in an expression.
+    *
+    * @param expr The expression in which time indicators are materialized.
+    * @param rowType The input schema of the expression.
+    * @param rexBuilder A RexBuilder.
+    *
+    * @return The expression with materialized time indicators.
+    */
+  def convertExpression(expr: RexNode, rowType: RelDataType, rexBuilder: RexBuilder): RexNode = {
+    val materializer = new RexTimeIndicatorMaterializer(
+          rexBuilder,
+          rowType.getFieldList.map(_.getType))
+
+        expr.accept(materializer)
+  }
+}
+
+class RexTimeIndicatorMaterializer(
+  private val rexBuilder: RexBuilder,
+  private val input: Seq[RelDataType])
+  extends RexShuttle {
+
+  private val timestamp = rexBuilder
+    .getTypeFactory
+    .asInstanceOf[FlinkTypeFactory]
+    .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+
+  override def visitInputRef(inputRef: RexInputRef): RexNode = {
+    // reference is interesting
+    if (isTimeIndicatorType(inputRef.getType)) {
+      val resolvedRefType = input(inputRef.getIndex)
+      // input is a valid time indicator
+      if (isTimeIndicatorType(resolvedRefType)) {
+        inputRef
+      }
+      // input has been materialized
+      else {
+        new RexInputRef(inputRef.getIndex, resolvedRefType)
+      }
+    }
+    // reference is a regular field
+    else {
+      super.visitInputRef(inputRef)
+    }
+  }
+
+  override def visitCall(call: RexCall): RexNode = {
+    val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
+
+    // materialize operands with time indicators
+    val materializedOperands = updatedCall.getOperator match {
+
+      // skip materialization for special operators
+      case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
+        updatedCall.getOperands.toList
+
+      case _ =>
+        updatedCall.getOperands.map { o =>
+          if (isTimeIndicatorType(o.getType)) {
+            rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
+          } else {
+            o
+          }
+        }
+    }
+
+    // remove time indicator return type
+    updatedCall.getOperator match {
+
+      // we do not modify AS if operand has not been materialized
+      case SqlStdOperatorTable.AS if
+      isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
+        updatedCall
+
+      // materialize function's result and operands
+      case _ if isTimeIndicatorType(updatedCall.getType) =>
+        updatedCall.clone(timestamp, materializedOperands)
+
+      // materialize function's operands only
+      case _ =>
+        updatedCall.clone(updatedCall.getType, materializedOperands)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 693924e..3e355ff 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -22,12 +22,10 @@ import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
 import org.apache.calcite.rex._
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
 
-import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
 trait CommonCalc {
@@ -37,39 +35,26 @@ trait CommonCalc {
       ruleDescription: String,
       inputSchema: RowSchema,
       returnSchema: RowSchema,
-      calcProgram: RexProgram,
+      calcProjection: Seq[RexNode],
+      calcCondition: Option[RexNode],
       config: TableConfig,
       functionClass: Class[T]):
     GeneratedFunction[T, Row] = {
 
-    val expandedExpressions = calcProgram
-      .getProjectList
-      .map(expr => calcProgram.expandLocalRef(expr))
-      // time indicator fields must not be part of the code generation
-      .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType))
-      // update indices
-      .map(expr => inputSchema.mapRexNode(expr))
-
-    val condition = if (calcProgram.getCondition != null) {
-      inputSchema.mapRexNode(calcProgram.expandLocalRef(calcProgram.getCondition))
-    } else {
-      null
-    }
-
     val projection = generator.generateResultExpression(
       returnSchema.physicalTypeInfo,
       returnSchema.physicalFieldNames,
-      expandedExpressions)
+      calcProjection)
 
     // only projection
-    val body = if (condition == null) {
+    val body = if (calcCondition.isEmpty) {
       s"""
         |${projection.code}
         |${generator.collectorTerm}.collect(${projection.resultTerm});
         |""".stripMargin
     }
     else {
-      val filterCondition = generator.generateExpression(condition)
+      val filterCondition = generator.generateExpression(calcCondition.get)
       // only filter
       if (projection == null) {
         s"""

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index a923acc..7417eb2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -35,6 +35,8 @@ import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.FlatMapRunner
 import org.apache.flink.types.Row
 
+import scala.collection.JavaConverters._
+
 /**
   * Flink RelNode which matches along with LogicalCalc.
   *
@@ -90,12 +92,20 @@ class DataSetCalc(
 
     val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
+    val projection = calcProgram.getProjectList.asScala.map(calcProgram.expandLocalRef)
+    val condition = if (calcProgram.getCondition != null) {
+      Some(calcProgram.expandLocalRef(calcProgram.getCondition))
+    } else {
+      None
+    }
+
     val genFunction = generateFunction(
       generator,
       ruleDescription,
       new RowSchema(getInput.getRowType),
       new RowSchema(getRowType),
-      calcProgram,
+      projection,
+      condition,
       config,
       classOf[FlatMapFunction[Row, Row]])
 

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index e8f3b82..1583e31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -58,7 +58,7 @@ class DataSetJoin(
   with CommonJoin
   with DataSetRel {
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType(): RelDataType = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataSetJoin(

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index d626c46..2e00330 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -28,11 +28,14 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.codegen.FunctionCodeGenerator
+import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
 import org.apache.flink.table.plan.nodes.CommonCalc
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.CRowProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
+import scala.collection.JavaConverters._
+
 /**
   * Flink RelNode which matches along with FlatMapOperator.
   *
@@ -93,6 +96,25 @@ class DataStreamCalc(
     val inputDataStream =
       getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
 
+    // materialize time attributes in condition
+    val condition = if (calcProgram.getCondition != null) {
+      val materializedCondition = RelTimeIndicatorConverter.convertExpression(
+        calcProgram.expandLocalRef(calcProgram.getCondition),
+        inputSchema.logicalType,
+        cluster.getRexBuilder)
+      Some(materializedCondition)
+    } else {
+      None
+    }
+
+    // filter out time attributes
+    val projection = calcProgram.getProjectList.asScala
+      .map(calcProgram.expandLocalRef)
+      // time indicator fields must not be part of the code generation
+      .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType))
+      // update indices
+      .map(expr => inputSchema.mapRexNode(expr))
+
     val generator = new FunctionCodeGenerator(config, false, inputSchema.physicalTypeInfo)
 
     val genFunction = generateFunction(
@@ -100,7 +122,8 @@ class DataStreamCalc(
       ruleDescription,
       inputSchema,
       schema,
-      calcProgram,
+      projection,
+      condition,
       config,
       classOf[ProcessFunction[CRow, CRow]])
 

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
index 1315a79..987947c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.calcite.rex.RexNode
@@ -27,12 +28,12 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
 import org.apache.flink.table.plan.nodes.CommonJoin
 import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, WindowJoinUtil}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.table.updateutils.UpdateCheckUtils
 
 /**
-  * Flink RelNode which matches along with JoinOperator and its related operations.
+  * RelNode for a time windowed stream join.
   */
 class DataStreamWindowJoin(
     cluster: RelOptCluster,
@@ -53,7 +54,7 @@ class DataStreamWindowJoin(
     with CommonJoin
     with DataStreamRel {
 
-  override def deriveRowType() = schema.logicalType
+  override def deriveRowType(): RelDataType = schema.logicalType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamWindowJoin(
@@ -96,8 +97,8 @@ class DataStreamWindowJoin(
 
     val config = tableEnv.getConfig
 
-    val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
-    val isRightAppendOnly = UpdateCheckUtils.isAppendOnly(right)
+    val isLeftAppendOnly = UpdatingPlanChecker.isAppendOnly(left)
+    val isRightAppendOnly = UpdatingPlanChecker.isAppendOnly(right)
     if (!isLeftAppendOnly || !isRightAppendOnly) {
       throw new TableException(
         "Windowed stream join does not support updates.")
@@ -124,21 +125,20 @@ class DataStreamWindowJoin(
 
     joinType match {
       case JoinRelType.INNER =>
-        isRowTime match {
-          case false =>
-            // Proctime JoinCoProcessFunction
-            createProcTimeInnerJoinFunction(
-              leftDataStream,
-              rightDataStream,
-              joinFunction.name,
-              joinFunction.code,
-              leftKeys,
-              rightKeys
-            )
-          case true =>
-            // RowTime JoinCoProcessFunction
-            throw new TableException(
-              "RowTime inner join between stream and stream is not supported yet.")
+        if (isRowTime) {
+          // RowTime JoinCoProcessFunction
+          throw new TableException(
+            "RowTime inner join between stream and stream is not supported yet.")
+        } else {
+          // Proctime JoinCoProcessFunction
+          createProcTimeInnerJoinFunction(
+            leftDataStream,
+            rightDataStream,
+            joinFunction.name,
+            joinFunction.code,
+            leftKeys,
+            rightKeys
+          )
         }
       case JoinRelType.FULL =>
         throw new TableException(

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
index c7a190f..2075689 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
@@ -21,13 +21,16 @@ package org.apache.flink.table.plan.rules.datastream
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamWindowJoin
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.join.WindowJoinUtil
 
+import scala.collection.JavaConverters._
+
 class DataStreamWindowJoinRule
   extends ConverterRule(
     classOf[FlinkLogicalJoin],
@@ -39,18 +42,36 @@ class DataStreamWindowJoinRule
     val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
     val joinInfo = join.analyzeCondition
 
-    try {
-      WindowJoinUtil.analyzeTimeBoundary(
-        joinInfo.getRemaining(join.getCluster.getRexBuilder),
-        join.getLeft.getRowType.getFieldCount,
-        new RowSchema(join.getRowType),
-        join.getCluster.getRexBuilder,
-        TableConfig.DEFAULT)
-      true
-    } catch {
-      case _: TableException =>
+    val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate(
+      joinInfo.getRemaining(join.getCluster.getRexBuilder),
+      join.getLeft.getRowType.getFieldCount,
+      join.getRowType,
+      join.getCluster.getRexBuilder,
+      TableConfig.DEFAULT)
+
+    // remaining predicate must not access time attributes
+    val remainingPredsAccessTime = remainingPreds.isDefined &&
+      WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, join.getRowType)
+
+    if (windowBounds.isDefined) {
+      if (windowBounds.get.isEventTime) {
+        // we cannot handle event-time window joins yet
         false
+      } else {
+        // Check that no event-time attributes are in the input.
+        // The proc-time join implementation does ensure that record timestamp are correctly set.
+        // It is always the timestamp of the later arriving record.
+        // We rely on projection pushdown to remove unused attributes before the join.
+        val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
+          .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+        !remainingPredsAccessTime && !rowTimeAttrInOutput
+      }
+    } else {
+      // the given join does not have valid window bounds. We cannot translate it.
+      false
     }
+
   }
 
   override def convert(rel: RelNode): RelNode = {
@@ -63,11 +84,11 @@ class DataStreamWindowJoinRule
     val leftRowSchema = new RowSchema(convLeft.getRowType)
     val rightRowSchema = new RowSchema(convRight.getRowType)
 
-    val (isRowTime, leftLowerBoundary, leftUpperBoundary, remainCondition) =
-      WindowJoinUtil.analyzeTimeBoundary(
+    val (windowBounds, remainCondition) =
+      WindowJoinUtil.extractWindowBoundsFromPredicate(
         joinInfo.getRemaining(join.getCluster.getRexBuilder),
         leftRowSchema.logicalArity,
-        new RowSchema(join.getRowType),
+        join.getRowType,
         join.getCluster.getRexBuilder,
         TableConfig.DEFAULT)
 
@@ -81,9 +102,9 @@ class DataStreamWindowJoinRule
       leftRowSchema,
       rightRowSchema,
       new RowSchema(rel.getRowType),
-      isRowTime,
-      leftLowerBoundary,
-      leftUpperBoundary,
+      windowBounds.get.isEventTime,
+      windowBounds.get.leftLowerBound,
+      windowBounds.get.leftUpperBound,
       remainCondition,
       description)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
new file mode 100644
index 0000000..6a160f6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rel.{RelNode, RelVisitor}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.datastream._
+
+import _root_.scala.collection.JavaConverters._
+
+object UpdatingPlanChecker {
+
+  /** Validates that the plan produces only append changes. */
+  def isAppendOnly(plan: RelNode): Boolean = {
+    val appendOnlyValidator = new AppendOnlyValidator
+    appendOnlyValidator.go(plan)
+
+    appendOnlyValidator.isAppendOnly
+  }
+
+  /** Extracts the unique keys of the table produced by the plan. */
+  def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
+    val keyExtractor = new UniqueKeyExtractor
+    keyExtractor.go(plan)
+    keyExtractor.keys
+  }
+
+  private class AppendOnlyValidator extends RelVisitor {
+
+    var isAppendOnly = true
+
+    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+      node match {
+        case s: DataStreamRel if s.producesUpdates =>
+          isAppendOnly = false
+        case _ =>
+          super.visit(node, ordinal, parent)
+      }
+    }
+  }
+
+  /** Identifies unique key fields in the output of a RelNode. */
+  private class UniqueKeyExtractor extends RelVisitor {
+
+    var keys: Option[Array[String]] = None
+
+    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+      node match {
+        case c: DataStreamCalc =>
+          super.visit(node, ordinal, parent)
+          // check if input has keys
+          if (keys.isDefined) {
+            // track keys forward
+            val inNames = c.getInput.getRowType.getFieldNames
+            val inOutNames = c.getProgram.getNamedProjects.asScala
+              .map(p => {
+                c.getProgram.expandLocalRef(p.left) match {
+                  // output field is forwarded input field
+                  case i: RexInputRef => (i.getIndex, p.right)
+                  // output field is renamed input field
+                  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+                    a.getOperands.get(0) match {
+                      case ref: RexInputRef =>
+                        (ref.getIndex, p.right)
+                      case _ =>
+                        (-1, p.right)
+                    }
+                  // output field is not forwarded from input
+                  case _: RexNode => (-1, p.right)
+                }
+              })
+              // filter all non-forwarded fields
+              .filter(_._1 >= 0)
+              // resolve names of input fields
+              .map(io => (inNames.get(io._1), io._2))
+
+            // filter by input keys
+            val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
+            // check if all keys have been preserved
+            if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
+              // all key have been preserved (but possibly renamed)
+              keys = Some(outKeys.toArray)
+            } else {
+              // some (or all) keys have been removed. Keys are no longer unique and removed
+              keys = None
+            }
+          }
+        case _: DataStreamOverAggregate =>
+          super.visit(node, ordinal, parent)
+        // keys are always forwarded by Over aggregate
+        case a: DataStreamGroupAggregate =>
+          // get grouping keys
+          val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
+          keys = Some(groupKeys.toArray)
+        case w: DataStreamGroupWindowAggregate =>
+          // get grouping keys
+          val groupKeys =
+            w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
+          // get window start and end time
+          val windowStartEnd = w.getWindowProperties.map(_.name)
+          // we have only a unique key if at least one window property is selected
+          if (windowStartEnd.nonEmpty) {
+            keys = Some(groupKeys ++ windowStartEnd)
+          }
+        case _: DataStreamRel =>
+          // anything else does not forward keys or might duplicate key, so we can stop
+          keys = None
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
index 97d5ccc..8a3ba43 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
@@ -59,24 +59,26 @@ class ProcTimeWindowInnerJoin(
 
   private var cRowWrapper: CRowWrappingCollector = _
 
-  /** other condition function **/
+  // other condition function
   private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
 
-  /** tmp list to store expired records **/
-  private var listToRemove: JList[Long] = _
+  // tmp list to store expired records
+  private var removeList: JList[Long] = _
 
-  /** state to hold left stream element **/
+  // state to hold left stream element
   private var row1MapState: MapState[Long, JList[Row]] = _
-  /** state to hold right stream element **/
+  // state to hold right stream element
   private var row2MapState: MapState[Long, JList[Row]] = _
 
-  /** state to record last timer of left stream, 0 means no timer **/
+  // state to record last timer of left stream, 0 means no timer
   private var timerState1: ValueState[Long] = _
-  /** state to record last timer of right stream, 0 means no timer **/
+  // state to record last timer of right stream, 0 means no timer
   private var timerState2: ValueState[Long] = _
 
-  private val leftStreamWinSize: Long = if (leftLowerBound < 0) -leftLowerBound else 0
-  private val rightStreamWinSize: Long = if (leftUpperBound > 0) leftUpperBound else 0
+  // compute window sizes, i.e., how long to keep rows in state.
+  // window size of -1 means rows do not need to be put into state.
+  private val leftStreamWinSize: Long = if (leftLowerBound <= 0) -leftLowerBound else -1
+  private val rightStreamWinSize: Long = if (leftUpperBound >= 0) leftUpperBound else -1
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 
@@ -90,7 +92,7 @@ class ProcTimeWindowInnerJoin(
     LOG.debug("Instantiating JoinFunction.")
     joinFunction = clazz.newInstance()
 
-    listToRemove = new util.ArrayList[Long]()
+    removeList = new util.ArrayList[Long]()
     cRowWrapper = new CRowWrappingCollector()
     cRowWrapper.setChange(true)
 
@@ -140,7 +142,7 @@ class ProcTimeWindowInnerJoin(
       row2MapState,
       -leftUpperBound,     // right stream lower
       -leftLowerBound,     // right stream upper
-      true
+      isLeft = true
     )
   }
 
@@ -165,9 +167,9 @@ class ProcTimeWindowInnerJoin(
       timerState2,
       row2MapState,
       row1MapState,
-      leftLowerBound,    // left stream upper
+      leftLowerBound,    // left stream lower
       leftUpperBound,    // left stream upper
-      false
+      isLeft = false
     )
   }
 
@@ -217,66 +219,82 @@ class ProcTimeWindowInnerJoin(
       winSize: Long,
       timerState: ValueState[Long],
       rowMapState: MapState[Long, JList[Row]],
-      oppoRowMapState: MapState[Long, JList[Row]],
-      oppoLowerBound: Long,
-      oppoUpperBound: Long,
+      otherRowMapState: MapState[Long, JList[Row]],
+      otherLowerBound: Long,
+      otherUpperBound: Long,
       isLeft: Boolean): Unit = {
 
     cRowWrapper.out = out
 
-    val value = valueC.row
+    val row = valueC.row
 
     val curProcessTime = ctx.timerService.currentProcessingTime
-    val oppoLowerTime = curProcessTime + oppoLowerBound
-    val oppoUpperTime = curProcessTime + oppoUpperBound
-
-    // only when windowsize != 0, we need to store the element
-    if (winSize != 0) {
-      // register a timer to expire the element
-      if (timerState.value == 0) {
-        ctx.timerService.registerProcessingTimeTimer(curProcessTime + winSize + 1)
-        timerState.update(curProcessTime + winSize + 1)
-      }
+    val otherLowerTime = curProcessTime + otherLowerBound
+    val otherUpperTime = curProcessTime + otherUpperBound
 
+    if (winSize >= 0) {
+      // put row into state for later joining.
+      // (winSize == 0) joins rows received in the same millisecond.
       var rowList = rowMapState.get(curProcessTime)
       if (rowList == null) {
         rowList = new util.ArrayList[Row]()
       }
-      rowList.add(value)
+      rowList.add(row)
       rowMapState.put(curProcessTime, rowList)
 
+      // register a timer to remove the row from state once it is expired
+      if (timerState.value == 0) {
+        val cleanupTime = curProcessTime + winSize + 1
+        ctx.timerService.registerProcessingTimeTimer(cleanupTime)
+        timerState.update(cleanupTime)
+      }
     }
 
-    // loop the other stream elements
-    val oppositeKeyIter = oppoRowMapState.keys().iterator()
-    while (oppositeKeyIter.hasNext) {
-      val eleTime = oppositeKeyIter.next()
-      if (eleTime < oppoLowerTime) {
-        listToRemove.add(eleTime)
-      } else if (eleTime <= oppoUpperTime) {
-        val oppoRowList = oppoRowMapState.get(eleTime)
-        var i = 0
-        if (isLeft) {
-          while (i < oppoRowList.size) {
-            joinFunction.join(value, oppoRowList.get(i), cRowWrapper)
+    // join row with rows received from the other input
+    val otherTimeIter = otherRowMapState.keys().iterator()
+    if (isLeft) {
+      // go over all timestamps in the other input's state
+      while (otherTimeIter.hasNext) {
+        val otherTimestamp = otherTimeIter.next()
+        if (otherTimestamp < otherLowerTime) {
+          // other timestamp is expired. Remove it later.
+          removeList.add(otherTimestamp)
+        } else if (otherTimestamp <= otherUpperTime) {
+          // join row with all rows from the other input for this timestamp
+          val otherRows = otherRowMapState.get(otherTimestamp)
+          var i = 0
+          while (i < otherRows.size) {
+            joinFunction.join(row, otherRows.get(i), cRowWrapper)
             i += 1
           }
-        } else {
-          while (i < oppoRowList.size) {
-            joinFunction.join(oppoRowList.get(i), value, cRowWrapper)
+        }
+      }
+    } else {
+      // go over all timestamps in the other input's state
+      while (otherTimeIter.hasNext) {
+        val otherTimestamp = otherTimeIter.next()
+        if (otherTimestamp < otherLowerTime) {
+          // other timestamp is expired. Remove it later.
+          removeList.add(otherTimestamp)
+        } else if (otherTimestamp <= otherUpperTime) {
+          // join row with all rows from the other input for this timestamp
+          val otherRows = otherRowMapState.get(otherTimestamp)
+          var i = 0
+          while (i < otherRows.size) {
+            joinFunction.join(otherRows.get(i), row, cRowWrapper)
             i += 1
           }
         }
       }
     }
 
-    // expire records out-of-time
-    var i = listToRemove.size - 1
+    // remove rows for expired timestamps
+    var i = removeList.size - 1
     while (i >= 0) {
-      oppoRowMapState.remove(listToRemove.get(i))
+      otherRowMapState.remove(removeList.get(i))
       i -= 1
     }
-    listToRemove.clear()
+    removeList.clear()
   }
 
   /**
@@ -292,32 +310,34 @@ class ProcTimeWindowInnerJoin(
 
     val expiredTime = curTime - winSize
     val keyIter = rowMapState.keys().iterator()
-    var nextTimer: Long = 0
+    var validTimestamp: Boolean = false
     // Search for expired timestamps.
     // If we find a non-expired timestamp, remember the timestamp and leave the loop.
     // This way we find all expired timestamps if they are sorted without doing a full pass.
-    while (keyIter.hasNext && nextTimer == 0) {
+    while (keyIter.hasNext && !validTimestamp) {
       val recordTime = keyIter.next
       if (recordTime < expiredTime) {
-        listToRemove.add(recordTime)
+        removeList.add(recordTime)
       } else {
-        nextTimer = recordTime
+        // we found a timestamp that is still valid
+        validTimestamp = true
       }
     }
 
     // Remove expired records from state
-    var i = listToRemove.size - 1
+    var i = removeList.size - 1
     while (i >= 0) {
-      rowMapState.remove(listToRemove.get(i))
+      rowMapState.remove(removeList.get(i))
       i -= 1
     }
-    listToRemove.clear()
+    removeList.clear()
 
     // If the state has non-expired timestamps, register a new timer.
     // Otherwise clean the complete state for this input.
-    if (nextTimer != 0) {
-      ctx.timerService.registerProcessingTimeTimer(nextTimer + winSize + 1)
-      timerState.update(nextTimer + winSize + 1)
+    if (validTimestamp) {
+      val cleanupTime = curTime + winSize + 1
+      ctx.timerService.registerProcessingTimeTimer(cleanupTime)
+      timerState.update(cleanupTime)
     } else {
       timerState.clear()
       rowMapState.clear()

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index fabeeba..c38d915 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.table.runtime.join
 
-import java.math.{BigDecimal => JBigDecimal}
 import java.util
 
 import org.apache.calcite.plan.RelOptUtil
@@ -27,263 +26,362 @@ import org.apache.calcite.rex._
 import org.apache.calcite.sql.SqlKind
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
 import org.apache.flink.table.plan.schema.{RowSchema, TimeIndicatorRelDataType}
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
 /**
-  * An util class to help analyze and build join code .
+  * An util class to optimize and generate window joins.
   */
 object WindowJoinUtil {
 
+  case class WindowBounds(isEventTime: Boolean, leftLowerBound: Long, leftUpperBound: Long)
+
+  protected case class WindowBound(bound: Long, isLeftLower: Boolean)
+  protected case class TimePredicate(
+    isEventTime: Boolean,
+    leftInputOnLeftSide: Boolean,
+    pred: RexCall)
+  protected case class TimeAttributeAccess(isEventTime: Boolean, isLeftInput: Boolean)
+
   /**
-    * Analyze time-condtion to get time boundary for each stream and get the time type
-    * and return remain condition.
+    * Extracts the window bounds from a join predicate.
+    *
+    * @param  predicate           join predicate
+    * @param  leftLogicalFieldCnt number of attributes on the left join input
+    * @param  inputSchema         schema of the join result
+    * @param  rexBuilder          RexBuilder
+    * @param  config              TableConfig
     *
-    * @param  condition           join condition
-    * @param  leftLogicalFieldCnt left stream logical field num
-    * @param  inputSchema         join rowtype schema
-    * @param  rexBuilder          util to build rexNode
-    * @param  config              table environment config
-    * @return isRowTime, left lower boundary, right lower boundary, remain condition
+    * @return A Tuple2 of extracted window bounds and remaining predicates.
     */
-  private[flink] def analyzeTimeBoundary(
-      condition: RexNode,
+  private[flink] def extractWindowBoundsFromPredicate(
+      predicate: RexNode,
       leftLogicalFieldCnt: Int,
-      inputSchema: RowSchema,
+      inputSchema: RelDataType,
       rexBuilder: RexBuilder,
-      config: TableConfig): (Boolean, Long, Long, Option[RexNode]) = {
+      config: TableConfig): (Option[WindowBounds], Option[RexNode]) = {
 
     // Converts the condition to conjunctive normal form (CNF)
-    val cnfCondition = RexUtil.toCnf(rexBuilder, condition)
+    val cnfCondition = RexUtil.toCnf(rexBuilder, predicate)
 
     // split the condition into time indicator condition and other condition
-    val (timeTerms, remainTerms) = cnfCondition match {
-      case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
-        c.getOperands.asScala
-          .map(analyzeCondtionTermType(_, leftLogicalFieldCnt, inputSchema.logicalType))
-          .reduceLeft((l, r) => {
-            (l._1 ++ r._1, l._2 ++ r._2)
-          })
-      case _ =>
-        throw new TableException("A time-based stream join requires exactly " +
-          "two join predicates that bound the time in both directions.")
+    val (timePreds, otherPreds) = cnfCondition match {
+        // We need at least two comparison predicates for a properly bounded window join.
+        // So we need an AND expression for a valid window join.
+        case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+          c.getOperands.asScala
+            .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema))
+            .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => {
+              analyzed match {
+                case Left(timePred) => (preds._1 :+ timePred, preds._2)
+                case Right(otherPred) => (preds._1, preds._2 :+ otherPred)
+              }
+            })
+        case _ =>
+          // No valid window bounds. A windowed stream join requires two comparison predicates that
+          // bound the time in both directions.
+          return (None, Some(predicate))
     }
 
-    if (timeTerms.size != 2) {
-      throw new TableException("A time-based stream join requires exactly " +
-        "two join predicates that bound the time in both directions.")
+    if (timePreds.size != 2) {
+      // No valid window bounds. A windowed stream join requires two comparison predicates that
+      // bound the time in both directions.
+      return (None, Some(predicate))
     }
 
-    // extract time offset from the time indicator conditon
-    val streamTimeOffsets =
-    timeTerms.map(x => extractTimeOffsetFromCondition(x._3, x._2, rexBuilder, config))
-
+    // assemble window bounds from predicates
+    val streamTimeOffsets = timePreds.map(computeWindowBoundFromPredicate(_, rexBuilder, config))
     val (leftLowerBound, leftUpperBound) =
       streamTimeOffsets match {
-        case Seq((x, true), (y, false)) => (x, y)
-        case Seq((x, false), (y, true)) => (y, x)
+        case Seq(Some(x: WindowBound), Some(y: WindowBound)) if x.isLeftLower && !y.isLeftLower =>
+          (x.bound, y.bound)
+        case Seq(Some(x: WindowBound), Some(y: WindowBound)) if !x.isLeftLower && y.isLeftLower =>
+          (y.bound, x.bound)
         case _ =>
-          throw new TableException(
-            "Time-based join conditions must reference the time attribute of both input tables.")
+          // Window join requires two comparison predicate that bound the time in both directions.
+          return (None, Some(predicate))
       }
 
     // compose the remain condition list into one condition
     val remainCondition =
-    remainTerms match {
-      case Seq() => None
+    otherPreds match {
+      case Seq() =>
+        None
       case _ =>
-        // Converts logical field references to physical ones.
-        Some(remainTerms.map(inputSchema.mapRexNode).reduceLeft((l, r) => {
-          RelOptUtil.andJoinFilters(rexBuilder, l, r)
-        }))
+        Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r)))
     }
 
-    val isRowTime: Boolean = timeTerms(0)._1 match {
-      case x if FlinkTypeFactory.isProctimeIndicatorType(x) => false
-      case _ => true
-    }
-    (isRowTime, leftLowerBound, leftUpperBound, remainCondition)
+    val bounds = Some(WindowBounds(timePreds.head.isEventTime, leftLowerBound, leftUpperBound))
+
+    (bounds, remainCondition)
   }
 
   /**
-    * Split the join conditions into time condition and non-time condition
+    * Analyzes a predicate and identifies whether it is a valid predicate for a window join.
+    * A valid window join predicate is a comparison predicate (<, <=, =>, >) that accesses
+    * time attributes of both inputs, each input on a different side of the condition.
+    * Both accessed time attributes must be of the same time type, i.e., row-time or proc-time.
+    *
+    * Examples:
+    * - left.rowtime > right.rowtime + 2.minutes => valid
+    * - left.proctime < right.rowtime + 2.minutes => invalid: different time type
+    * - left.rowtime == right.rowtime + 2.minutes => invalid: not a comparison predicate
+    * - left.rowtime - right.rowtime < 2.minutes => invalid: both time attributes on same side
     *
-    * @return (Seq(timeTerms), Seq(remainTerms)),
+    * If the predicate is a regular join predicate, i.e., it accesses no time attribute it is
+    * returned as well.
+    *
+    * @return Either a valid time predicate (Left) or a valid non-time predicate (Right)
     */
-  private def analyzeCondtionTermType(
-      conditionTerm: RexNode,
+  private def identifyTimePredicate(
+      pred: RexNode,
       leftFieldCount: Int,
-      inputType: RelDataType): (Seq[(RelDataType, Boolean, RexNode)], Seq[RexNode]) = {
-
-    conditionTerm match {
-      case c: RexCall if Seq(SqlKind.GREATER_THAN, SqlKind.GREATER_THAN_OR_EQUAL,
-        SqlKind.LESS_THAN, SqlKind.LESS_THAN_OR_EQUAL).contains(c.getKind) =>
-        val timeIndicators = extractTimeIndicatorAccesses(c, leftFieldCount, inputType)
-        timeIndicators match {
-          case Seq() =>
-            (Seq(), Seq(c))
-          case Seq(v1, v2) =>
-            if (v1._1 != v2._1) {
-              throw new TableException(
-                "Both time attributes in a join condition must be of the same type.")
+      inputType: RelDataType): Either[TimePredicate, RexNode] = {
+
+    pred match {
+      case c: RexCall =>
+        c.getKind match {
+          case SqlKind.GREATER_THAN |
+               SqlKind.GREATER_THAN_OR_EQUAL |
+               SqlKind.LESS_THAN |
+               SqlKind.LESS_THAN_OR_EQUAL =>
+
+            val leftTerm = c.getOperands.get(0)
+            val rightTerm = c.getOperands.get(1)
+
+            // validate that both sides of the condition do not access non-time attributes
+            if (accessesNonTimeAttribute(leftTerm, inputType) ||
+                accessesNonTimeAttribute(rightTerm, inputType)) {
+              return Right(pred)
             }
-            if (v1._2 == v2._2) {
-              throw new TableException("Time-based join conditions " +
-                "must reference the time attribute of both input tables.")
+
+            // get time attribute on left side of comparison
+            val leftTimeAttrAccess =
+              extractTimeAttributeAccesses(leftTerm, leftFieldCount, inputType) match {
+                case Seq() => None
+                case Seq(a) => Some(a)
+                case _ =>
+                  // Window join predicate may only access a single time attribute on each side.
+                  return Right(pred)
+              }
+
+            // get time attribute on right side of comparison
+            val rightTimeAccess =
+              extractTimeAttributeAccesses(rightTerm, leftFieldCount, inputType) match {
+                case Seq() => None
+                case Seq(a) => Some(a)
+                case _ =>
+                  // Window join predicate may only access a single time attribute on each side.
+                  return Right(pred)
+              }
+
+            // check if both sides of the condition access a time attribute,
+            // if both join inputs are accessed, and
+            // if both accesses are on the same time type (event-time or proc-time)
+            (leftTimeAttrAccess, rightTimeAccess) match {
+              case (None, None) =>
+                // neither left or right accesses a time attribute. This is a regular join predicate
+                Right(c)
+              case (Some(_), None) | (None, Some(_)) =>
+                // Both sides or a window join predicate must access a time attribute.
+                Right(pred)
+              case (Some(left), Some(right)) if left.isEventTime != right.isEventTime =>
+                // Both time attributes in a window join predicate must be of the same type.
+                Right(pred)
+              case (Some(left), Some(right)) if left.isLeftInput == right.isLeftInput =>
+                // Window join predicates must reference the time attribute of both inputs.
+                Right(pred)
+              case (Some(left), Some(_)) =>
+                Left(TimePredicate(left.isEventTime, left.isLeftInput, c))
             }
-            (Seq((v1._1, v1._2, c)), Seq())
-          case _ =>
-            throw new TableException(
-              "Time-based join conditions must reference the time attribute of both input tables.")
+          // not a comparison predicate.
+          case _ => Right(pred)
         }
       case other =>
-        val timeIndicators = extractTimeIndicatorAccesses(other, leftFieldCount, inputType)
-        timeIndicators match {
-          case Seq() =>
-            (Seq(), Seq(other))
-          case _ =>
-            throw new TableException("Time indicators can not be used in non time-condition.")
-        }
+        Right(other)
     }
   }
 
   /**
-    * Extracts all time indicator attributes that are accessed in an expression.
+    * Extracts all time attributes that are accessed in an expression.
     *
-    * @return seq(timeType, is left input time indicator)
+    * @return A Seq of all time attribute accessed in the expression.
     */
-  def extractTimeIndicatorAccesses(
-      expression: RexNode,
+  def extractTimeAttributeAccesses(
+      expr: RexNode,
       leftFieldCount: Int,
-      inputType: RelDataType): Seq[(RelDataType, Boolean)] = {
+      inputType: RelDataType): Seq[TimeAttributeAccess] = {
 
-    expression match {
+    expr match {
       case i: RexInputRef =>
+        // check if time attribute is accessed and from which input side
         val idx = i.getIndex
         inputType.getFieldList.get(idx).getType match {
-          case t: TimeIndicatorRelDataType if idx < leftFieldCount =>
-            // left table time indicator
-            Seq((t, true))
           case t: TimeIndicatorRelDataType =>
-            // right table time indicator
-            Seq((t, false))
-          case _ => Seq()
+            // time attribute access. Remember time type and side of input
+            val isLeftInput = idx < leftFieldCount
+            Seq(TimeAttributeAccess(t.isEventTime, isLeftInput))
+          case _ =>
+            // not a time attribute access.
+            Seq()
         }
       case c: RexCall =>
+        // concat time-attributes of all operands
         c.operands.asScala
-          .map(extractTimeIndicatorAccesses(_, leftFieldCount, inputType))
+          .map(extractTimeAttributeAccesses(_, leftFieldCount, inputType))
           .reduce(_ ++ _)
       case _ => Seq()
     }
   }
 
   /**
+    * Checks if an expression accesses a time attribute.
+    *
+    * @param expr The expression to check.
+    * @param inputType The input type of the expression.
+    * @return True, if the expression accesses a time attribute. False otherwise.
+    */
+  def accessesTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
+    expr match {
+      case i: RexInputRef =>
+        val accessedType = inputType.getFieldList.get(i.getIndex).getType
+        accessedType match {
+          case _: TimeIndicatorRelDataType => true
+          case _ => false
+        }
+      case c: RexCall =>
+        c.operands.asScala.exists(accessesTimeAttribute(_, inputType))
+      case _ => false
+    }
+  }
+
+  /**
+    * Checks if an expression accesses a non-time attribute.
+    *
+    * @param expr The expression to check.
+    * @param inputType The input type of the expression.
+    * @return True, if the expression accesses a non-time attribute. False otherwise.
+    */
+  def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
+    expr match {
+      case i: RexInputRef =>
+        val accessedType = inputType.getFieldList.get(i.getIndex).getType
+        accessedType match {
+          case _: TimeIndicatorRelDataType => false
+          case _ => true
+        }
+      case c: RexCall =>
+        c.operands.asScala.exists(accessesNonTimeAttribute(_, inputType))
+      case _ => false
+    }
+  }
+
+  /**
     * Computes the absolute bound on the left operand of a comparison expression and
     * whether the bound is an upper or lower bound.
     *
     * @return window boundary, is left lower bound
     */
-  def extractTimeOffsetFromCondition(
-      timeTerm: RexNode,
-      isLeftExprBelongLeftTable: Boolean,
+  def computeWindowBoundFromPredicate(
+      timePred: TimePredicate,
       rexBuilder: RexBuilder,
-      config: TableConfig): (Long, Boolean) = {
-
-    val timeCall: RexCall = timeTerm.asInstanceOf[RexCall]
+      config: TableConfig): Option[WindowBound] = {
 
     val isLeftLowerBound: Boolean =
-      timeTerm.getKind match {
-        // e.g a.proctime > b.proctime - 5 sec, then it's the lower bound of a and the value is -5
-        // e.g b.proctime > a.proctime - 5 sec, then it's not the lower bound of a but upper bound
-        case kind@(SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL) =>
-          isLeftExprBelongLeftTable
-        // e.g a.proctime < b.proctime + 5 sec, the the upper bound of a is 5
-        case kind@(SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
-          !isLeftExprBelongLeftTable
-        case _ =>
-          throw new TableException("Unsupported time-condition.")
+      timePred.pred.getKind match {
+        case (SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL) =>
+          timePred.leftInputOnLeftSide
+        case (SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
+          !timePred.leftInputOnLeftSide
       }
 
-    val (leftLiteral, rightLiteral) =
-      reduceTimeExpression(
-        timeCall.operands.get(0),
-        timeCall.operands.get(1),
-        rexBuilder,
-        config)
-    val tmpTimeOffset: Long =
-      if (isLeftExprBelongLeftTable) rightLiteral - leftLiteral else leftLiteral - rightLiteral
-
-    val boundary =
-      tmpTimeOffset.signum * (
-        if (timeTerm.getKind == SqlKind.LESS_THAN || timeTerm.getKind == SqlKind.GREATER_THAN) {
-          tmpTimeOffset.abs - 1
-        } else {
-          tmpTimeOffset.abs
-        })
-
-    (boundary, isLeftLowerBound)
+    // reduce predicate to constants to compute bounds
+    val (leftLiteral, rightLiteral) = reduceTimeExpression(timePred, rexBuilder, config)
+
+    if (leftLiteral.isEmpty || rightLiteral.isEmpty) {
+      return None
+    }
+
+    // compute boundary
+    val tmpTimeOffset: Long = if (timePred.leftInputOnLeftSide) {
+      rightLiteral.get - leftLiteral.get
+    } else {
+      leftLiteral.get - rightLiteral.get
+    }
+    val boundary = timePred.pred.getKind match {
+      case SqlKind.LESS_THAN =>
+        tmpTimeOffset - 1
+      case SqlKind.GREATER_THAN =>
+        tmpTimeOffset + 1
+      case _ =>
+        tmpTimeOffset
+    }
+
+    Some(WindowBound(boundary, isLeftLowerBound))
   }
 
   /**
-    * Calculates the time boundary by replacing the time attribute by a zero literal
-    * and reducing the expression.
-    * For example:
-    * b.proctime - interval '1' second - interval '2' second will be translated to
-    * 0 - 1000 - 2000
+    * Replaces the time attributes on both sides of a time predicate by a zero literal and
+    * reduces the expressions on both sides to a long literal.
+    *
+    * @param timePred The time predicate which both sides are reduced.
+    * @param rexBuilder A RexBuilder
+    * @param config A TableConfig.
+    * @return The values of the reduced literals on both sides of the time comparison predicate.
     */
   private def reduceTimeExpression(
-      leftRexNode: RexNode,
-      rightRexNode: RexNode,
+      timePred: TimePredicate,
       rexBuilder: RexBuilder,
-      config: TableConfig): (Long, Long) = {
+      config: TableConfig): (Option[Long], Option[Long]) = {
 
     /**
-      * replace the rowtime/proctime with zero literal.
+      * Replace the time attribute by zero literal.
       */
     def replaceTimeFieldWithLiteral(expr: RexNode): RexNode = {
       expr match {
         case c: RexCall =>
           // replace in call operands
-          val newOps = c.operands.asScala.map(replaceTimeFieldWithLiteral(_)).asJava
+          val newOps = c.operands.asScala.map(replaceTimeFieldWithLiteral).asJava
           rexBuilder.makeCall(c.getType, c.getOperator, newOps)
         case i: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(i.getType) =>
           // replace with timestamp
           rexBuilder.makeZeroLiteral(expr.getType)
-        case _: RexInputRef =>
-          throw new TableException("Time join condition may only reference time indicator fields.")
         case _ => expr
       }
     }
 
-    val literalLeftRex = replaceTimeFieldWithLiteral(leftRexNode)
-    val literalRightRex = replaceTimeFieldWithLiteral(rightRexNode)
+    val leftSide = timePred.pred.operands.get(0)
+    val rightSide = timePred.pred.operands.get(1)
 
+    val leftSideWithLiteral = replaceTimeFieldWithLiteral(leftSide)
+    val rightSideWithLiteral = replaceTimeFieldWithLiteral(rightSide)
+
+    // reduce expression to literal
     val exprReducer = new ExpressionReducer(config)
     val originList = new util.ArrayList[RexNode]()
-    originList.add(literalLeftRex)
-    originList.add(literalRightRex)
+    originList.add(leftSideWithLiteral)
+    originList.add(rightSideWithLiteral)
     val reduceList = new util.ArrayList[RexNode]()
     exprReducer.reduce(rexBuilder, originList, reduceList)
 
-    val literals = reduceList.asScala.map(f => f match {
+    // extract bounds from reduced literal
+    val literals = reduceList.asScala.map {
       case literal: RexLiteral =>
-        literal.getValue2.asInstanceOf[Long]
+        Some(literal.getValue2.asInstanceOf[Long])
       case _ =>
-        throw TableException(
-          "Time condition may only consist of time attributes, literals, and arithmetic operators.")
-    })
+        None
+    }
 
-    (literals(0), literals(1))
+    (literals.head, literals(1))
   }
 
-
   /**
-    * Generate other non-equi condition function
+    * Generates a JoinFunction that applies additional join predicates and projects the result.
     *
     * @param  config          table env config
     * @param  joinType        join type to determain whether input can be null
@@ -300,7 +398,7 @@ object WindowJoinUtil {
       rightType: TypeInformation[Row],
       returnType: RowSchema,
       otherCondition: Option[RexNode],
-      ruleDescription: String) = {
+      ruleDescription: String): GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row] = {
 
     // whether input can be null
     val nullCheck = joinType match {
@@ -311,7 +409,7 @@ object WindowJoinUtil {
     }
 
     // generate other non-equi function code
-    val generator = new CodeGenerator(
+    val generator = new FunctionCodeGenerator(
       config,
       nullCheck,
       leftType,
@@ -329,7 +427,9 @@ object WindowJoinUtil {
            |${generator.collectorTerm}.collect(${conversion.resultTerm});
            |""".stripMargin
       case Some(remainCondition) =>
-        val genCond = generator.generateExpression(remainCondition)
+        // map logical field accesses to physical accesses
+        val physicalCondition = returnType.mapRexNode(remainCondition)
+        val genCond = generator.generateExpression(physicalCondition)
         s"""
            |${genCond.code}
            |if (${genCond.resultTerm}) {

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
deleted file mode 100644
index 769ba55..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.updateutils
-
-import org.apache.calcite.rel.{RelNode, RelVisitor}
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SqlKind
-import org.apache.flink.table.plan.nodes.datastream._
-import _root_.scala.collection.JavaConverters._
-
-object UpdateCheckUtils {
-
-  /** Validates that the plan produces only append changes. */
-  def isAppendOnly(plan: RelNode): Boolean = {
-    val appendOnlyValidator = new AppendOnlyValidator
-    appendOnlyValidator.go(plan)
-
-    appendOnlyValidator.isAppendOnly
-  }
-
-  /** Extracts the unique keys of the table produced by the plan. */
-  def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
-    val keyExtractor = new UniqueKeyExtractor
-    keyExtractor.go(plan)
-    keyExtractor.keys
-  }
-
-  private class AppendOnlyValidator extends RelVisitor {
-
-    var isAppendOnly = true
-
-    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
-      node match {
-        case s: DataStreamRel if s.producesUpdates =>
-          isAppendOnly = false
-        case _ =>
-          super.visit(node, ordinal, parent)
-      }
-    }
-  }
-
-  /** Identifies unique key fields in the output of a RelNode. */
-  private class UniqueKeyExtractor extends RelVisitor {
-
-    var keys: Option[Array[String]] = None
-
-    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
-      node match {
-        case c: DataStreamCalc =>
-          super.visit(node, ordinal, parent)
-          // check if input has keys
-          if (keys.isDefined) {
-            // track keys forward
-            val inNames = c.getInput.getRowType.getFieldNames
-            val inOutNames = c.getProgram.getNamedProjects.asScala
-              .map(p => {
-                c.getProgram.expandLocalRef(p.left) match {
-                  // output field is forwarded input field
-                  case i: RexInputRef => (i.getIndex, p.right)
-                  // output field is renamed input field
-                  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
-                    a.getOperands.get(0) match {
-                      case ref: RexInputRef =>
-                        (ref.getIndex, p.right)
-                      case _ =>
-                        (-1, p.right)
-                    }
-                  // output field is not forwarded from input
-                  case _: RexNode => (-1, p.right)
-                }
-              })
-              // filter all non-forwarded fields
-              .filter(_._1 >= 0)
-              // resolve names of input fields
-              .map(io => (inNames.get(io._1), io._2))
-
-            // filter by input keys
-            val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
-            // check if all keys have been preserved
-            if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
-              // all key have been preserved (but possibly renamed)
-              keys = Some(outKeys.toArray)
-            } else {
-              // some (or all) keys have been removed. Keys are no longer unique and removed
-              keys = None
-            }
-          }
-        case _: DataStreamOverAggregate =>
-          super.visit(node, ordinal, parent)
-        // keys are always forwarded by Over aggregate
-        case a: DataStreamGroupAggregate =>
-          // get grouping keys
-          val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
-          keys = Some(groupKeys.toArray)
-        case w: DataStreamGroupWindowAggregate =>
-          // get grouping keys
-          val groupKeys =
-            w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
-          // get window start and end time
-          val windowStartEnd = w.getWindowProperties.map(_.name)
-          // we have only a unique key if at least one window property is selected
-          if (windowStartEnd.nonEmpty) {
-            keys = Some(groupKeys ++ windowStartEnd)
-          }
-        case _: DataStreamRel =>
-          // anything else does not forward keys or might duplicate key, so we can stop
-          keys = None
-      }
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
deleted file mode 100644
index d4ff3f7..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.types.Row
-import org.junit._
-
-import scala.collection.mutable
-
-class JoinITCase extends StreamingWithStateTestBase {
-
-  val data = List(
-    (1L, 1, "Hello"),
-    (2L, 2, "Hello"),
-    (3L, 3, "Hello"),
-    (4L, 4, "Hello"),
-    (5L, 5, "Hello"),
-    (6L, 6, "Hello"),
-    (7L, 7, "Hello World"),
-    (8L, 8, "Hello World"),
-    (20L, 20, "Hello World"))
-
-  /** test process time inner join **/
-  @Test
-  def testProcessTimeInnerJoin(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
-      "t1.proctime between t2.proctime - interval '5' second and t2.proctime + interval '5' second"
-
-    val data1 = new mutable.MutableList[(Int, Long, String)]
-    data1.+=((1, 1L, "Hi1"))
-    data1.+=((1, 2L, "Hi2"))
-    data1.+=((1, 5L, "Hi3"))
-    data1.+=((2, 7L, "Hi5"))
-    data1.+=((1, 9L, "Hi6"))
-    data1.+=((1, 8L, "Hi8"))
-
-    val data2 = new mutable.MutableList[(Int, Long, String)]
-    data2.+=((1, 1L, "HiHi"))
-    data2.+=((2, 2L, "HeHe"))
-
-    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-    tEnv.registerTable("T2", t2)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-  }
-
-  /** test process time inner join with other condition **/
-  @Test
-  def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
-      "t1.proctime between t2.proctime - interval '5' second " +
-      "and t2.proctime + interval '5' second " +
-      "and t1.b > t2.b and t1.b + t2.b < 14"
-
-    val data1 = new mutable.MutableList[(String, Long, String)]
-    data1.+=(("1", 1L, "Hi1"))
-    data1.+=(("1", 2L, "Hi2"))
-    data1.+=(("1", 5L, "Hi3"))
-    data1.+=(("2", 7L, "Hi5"))
-    data1.+=(("1", 9L, "Hi6"))
-    data1.+=(("1", 8L, "Hi8"))
-
-    val data2 = new mutable.MutableList[(String, Long, String)]
-    data2.+=(("1", 5L, "HiHi"))
-    data2.+=(("2", 2L, "HeHe"))
-
-    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-    tEnv.registerTable("T2", t2)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-  }
-
-}
-


[2/3] flink git commit: [FLINK-6232] [table] Add SQL documentation for time window join.

Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
deleted file mode 100644
index 15e8b89..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.calcite.rel.logical.LogicalJoin
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.table.runtime.join.WindowJoinUtil
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.Assert._
-import org.junit.Test
-
-class JoinTest extends TableTestBase {
-  private val streamUtil: StreamTableTestUtil = streamTestUtil()
-  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
-  streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
-
-  @Test
-  def testProcessingTimeInnerJoin() = {
-
-    val sqlQuery = "SELECT t1.a, t2.b " +
-      "FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " +
-      "t1.proctime between t2.proctime - interval '1' hour and t2.proctime + interval '1' hour"
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        binaryNode(
-          "DataStreamWindowJoin",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "proctime")
-          ),
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(1),
-            term("select", "a", "b", "proctime")
-          ),
-          term("where",
-            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
-              "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
-          term("join", "a, proctime, a0, b, proctime0"),
-          term("joinType", "InnerJoin")
-        ),
-        term("select", "a", "b")
-      )
-
-    streamUtil.verifySql(sqlQuery, expected)
-  }
-
-  /** There should exist time conditions **/
-  @Test(expected = classOf[TableException])
-  def testWindowJoinUnExistTimeCondition() = {
-    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a"
-    streamUtil.verifySql(sql, "n/a")
-  }
-
-  /** There should exist exactly two time conditions **/
-  @Test(expected = classOf[TableException])
-  def testWindowJoinSingleTimeCondition() = {
-    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
-      " and t1.proctime > t2.proctime - interval '5' second"
-    streamUtil.verifySql(sql, "n/a")
-  }
-
-  /** Both time attributes in a join condition must be of the same type **/
-  @Test(expected = classOf[TableException])
-  def testWindowJoinDiffTimeIndicator() = {
-    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
-      " and t1.proctime > t2.proctime - interval '5' second " +
-      " and t1.proctime < t2.c + interval '5' second"
-    streamUtil.verifySql(sql, "n/a")
-  }
-
-  /** The time conditions should be an And condition **/
-  @Test(expected = classOf[TableException])
-  def testWindowJoinNotCnfCondition() = {
-    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
-      " and (t1.proctime > t2.proctime - interval '5' second " +
-      " or t1.proctime < t2.c + interval '5' second)"
-    streamUtil.verifySql(sql, "n/a")
-  }
-
-  @Test
-  def testJoinTimeBoundary(): Unit = {
-    verifyTimeBoundary(
-      "t1.proctime between t2.proctime - interval '1' hour " +
-        "and t2.proctime + interval '1' hour",
-      -3600000,
-      3600000,
-      "proctime")
-
-    verifyTimeBoundary(
-      "t1.proctime > t2.proctime - interval '1' second and " +
-        "t1.proctime < t2.proctime + interval '1' second",
-      -999,
-      999,
-      "proctime")
-
-    verifyTimeBoundary(
-      "t1.c >= t2.c - interval '1' second and " +
-        "t1.c <= t2.c + interval '1' second",
-      -1000,
-      1000,
-      "rowtime")
-
-    verifyTimeBoundary(
-      "t1.c >= t2.c and " +
-        "t1.c <= t2.c + interval '1' second",
-      0,
-      1000,
-      "rowtime")
-
-    verifyTimeBoundary(
-      "t1.c >= t2.c + interval '1' second and " +
-        "t1.c <= t2.c + interval '10' second",
-      1000,
-      10000,
-      "rowtime")
-
-    verifyTimeBoundary(
-      "t2.c - interval '1' second <= t1.c and " +
-        "t2.c + interval '10' second >= t1.c",
-      -1000,
-      10000,
-      "rowtime")
-
-    verifyTimeBoundary(
-      "t1.c - interval '2' second >= t2.c + interval '1' second -" +
-        "interval '10' second and " +
-        "t1.c <= t2.c + interval '10' second",
-      -7000,
-      10000,
-      "rowtime")
-
-    verifyTimeBoundary(
-      "t1.c >= t2.c - interval '10' second and " +
-        "t1.c <= t2.c - interval '5' second",
-      -10000,
-      -5000,
-      "rowtime")
-  }
-
-  @Test
-  def testJoinRemainConditionConvert(): Unit = {
-    streamUtil.addTable[(Int, Long, Int)]("MyTable3", 'a, 'b.rowtime, 'c, 'proctime.proctime)
-    streamUtil.addTable[(Int, Long, Int)]("MyTable4", 'a, 'b.rowtime, 'c, 'proctime.proctime)
-    val query =
-      "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
-        "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second and " +
-        "t1.c > t2.c"
-    verifyRemainConditionConvert(
-      query,
-      ">($1, $3)")
-
-    val query1 =
-      "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
-        "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second "
-    verifyRemainConditionConvert(
-      query1,
-      "")
-
-    streamUtil.addTable[(Int, Long, Int)]("MyTable5", 'a, 'b, 'c, 'proctime.proctime)
-    streamUtil.addTable[(Int, Long, Int)]("MyTable6", 'a, 'b, 'c, 'proctime.proctime)
-    val query2 =
-      "SELECT t1.a, t2.c FROM MyTable5 as t1 join MyTable6 as t2 on t1.a = t2.a and " +
-        "t1.proctime >= t2.proctime - interval '10' second " +
-        "and t1.proctime <= t2.proctime - interval '5' second and " +
-        "t1.c > t2.c"
-    verifyRemainConditionConvert(
-      query2,
-      ">($2, $5)")
-  }
-
-  def verifyTimeBoundary(
-      timeSql: String,
-      expLeftSize: Long,
-      expRightSize: Long,
-      expTimeType: String) = {
-    val query =
-      "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
-
-    val resultTable = streamUtil.tEnv.sql(query)
-    val relNode = resultTable.getRelNode
-    val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
-    val rexNode = joinNode.getCondition
-    val (isRowTime, lowerBound, upperBound, conditionWithoutTime) =
-      WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType),
-        joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
-
-    val timeTypeStr =
-      if (isRowTime) "rowtime"
-      else  "proctime"
-    assertEquals(expLeftSize, lowerBound)
-    assertEquals(expRightSize, upperBound)
-    assertEquals(expTimeType, timeTypeStr)
-  }
-
-  def verifyRemainConditionConvert(
-      query: String,
-      expectCondStr: String) = {
-
-    val resultTable = streamUtil.tEnv.sql(query)
-    val relNode = resultTable.getRelNode
-    val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
-    val joinInfo = joinNode.analyzeCondition
-    val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
-    val (isRowTime, lowerBound, upperBound, remainCondition) =
-      WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType),
-        joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
-
-    val actual: String = remainCondition.getOrElse("").toString
-
-    assertEquals(expectCondStr, actual)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
new file mode 100644
index 0000000..640fd26
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.join.WindowJoinUtil
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Assert._
+import org.junit.Test
+
+class JoinTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+  streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+
+  @Test
+  def testProcessingTimeInnerJoinWithOnClause() = {
+
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1 JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+        |""".stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "proctime")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "proctime")
+          ),
+          term("where",
+            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+              "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+          term("join", "a, proctime, a0, b, proctime0"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testProcessingTimeInnerJoinWithWhereClause() = {
+
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+        |""".stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "proctime")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "proctime")
+          ),
+          term("where",
+            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+              "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+          term("join", "a, proctime, a0, b, proctime0"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b0 AS b")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testJoinTimeBoundary(): Unit = {
+    verifyTimeBoundary(
+      "t1.proctime between t2.proctime - interval '1' hour " +
+        "and t2.proctime + interval '1' hour",
+      -3600000,
+      3600000,
+      "proctime")
+
+    verifyTimeBoundary(
+      "t1.proctime > t2.proctime - interval '1' second and " +
+        "t1.proctime < t2.proctime + interval '1' second",
+      -999,
+      999,
+      "proctime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c - interval '1' second and " +
+        "t1.c <= t2.c + interval '1' second",
+      -1000,
+      1000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c and " +
+        "t1.c <= t2.c + interval '1' second",
+      0,
+      1000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c + interval '1' second and " +
+        "t1.c <= t2.c + interval '10' second",
+      1000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t2.c - interval '1' second <= t1.c and " +
+        "t2.c + interval '10' second >= t1.c",
+      -1000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c - interval '2' second >= t2.c + interval '1' second -" +
+        "interval '10' second and " +
+        "t1.c <= t2.c + interval '10' second",
+      -7000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c - interval '10' second and " +
+        "t1.c <= t2.c - interval '5' second",
+      -10000,
+      -5000,
+      "rowtime")
+  }
+
+  @Test
+  def testJoinRemainConditionConvert(): Unit = {
+    streamUtil.addTable[(Int, Long, Int)]("MyTable3", 'a, 'b.rowtime, 'c, 'proctime.proctime)
+    streamUtil.addTable[(Int, Long, Int)]("MyTable4", 'a, 'b.rowtime, 'c, 'proctime.proctime)
+    val query =
+      "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
+        "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second and " +
+        "t1.c > t2.c"
+    verifyRemainConditionConvert(
+      query,
+      ">($2, $6)")
+
+    val query1 =
+      "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
+        "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second "
+    verifyRemainConditionConvert(
+      query1,
+      "")
+
+    streamUtil.addTable[(Int, Long, Int)]("MyTable5", 'a, 'b, 'c, 'proctime.proctime)
+    streamUtil.addTable[(Int, Long, Int)]("MyTable6", 'a, 'b, 'c, 'proctime.proctime)
+    val query2 =
+      "SELECT t1.a, t2.c FROM MyTable5 as t1 join MyTable6 as t2 on t1.a = t2.a and " +
+        "t1.proctime >= t2.proctime - interval '10' second " +
+        "and t1.proctime <= t2.proctime - interval '5' second and " +
+        "t1.c > t2.c"
+    verifyRemainConditionConvert(
+      query2,
+      ">($2, $6)")
+  }
+
+  private def verifyTimeBoundary(
+      timeSql: String,
+      expLeftSize: Long,
+      expRightSize: Long,
+      expTimeType: String): Unit = {
+    val query =
+      "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
+
+    val resultTable = streamUtil.tableEnv.sql(query)
+    val relNode = resultTable.getRelNode
+    val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+    val rexNode = joinNode.getCondition
+    val (windowBounds, _) =
+      WindowJoinUtil.extractWindowBoundsFromPredicate(
+        rexNode,
+        4,
+        joinNode.getRowType,
+        joinNode.getCluster.getRexBuilder,
+        streamUtil.tableEnv.getConfig)
+
+    val timeTypeStr =
+      if (windowBounds.get.isEventTime) "rowtime"
+      else  "proctime"
+    assertEquals(expLeftSize, windowBounds.get.leftLowerBound)
+    assertEquals(expRightSize, windowBounds.get.leftUpperBound)
+    assertEquals(expTimeType, timeTypeStr)
+  }
+
+  private def verifyRemainConditionConvert(
+      query: String,
+      expectCondStr: String): Unit = {
+
+    val resultTable = streamUtil.tableEnv.sql(query)
+    val relNode = resultTable.getRelNode
+    val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+    val joinInfo = joinNode.analyzeCondition
+    val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
+    val (_, remainCondition) =
+      WindowJoinUtil.extractWindowBoundsFromPredicate(
+        rexNode,
+        4,
+        joinNode.getRowType,
+        joinNode.getCluster.getRexBuilder,
+        streamUtil.tableEnv.getConfig)
+
+    val actual: String = remainCondition.getOrElse("").toString
+
+    assertEquals(expectCondStr, actual)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
new file mode 100644
index 0000000..9cce37e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class JoinValidationTest extends TableTestBase {
+
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+  streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+
+  /** There should exist time conditions **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinUnExistTimeCondition() = {
+    val sql =
+      """
+        |SELECT t2.a
+        |FROM MyTable t1 JOIN MyTable2 t2 ON t1.a = t2.a""".stripMargin
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** There should exist exactly two time conditions **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinSingleTimeCondition() = {
+    val sql =
+      """
+        |SELECT t2.a
+        |FROM MyTable t1 JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  t1.proctime > t2.proctime - INTERVAL '5' SECOND""".stripMargin
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** Both time attributes in a join condition must be of the same type **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinDiffTimeIndicator() = {
+    val sql =
+      """
+        |SELECT t2.a FROM
+        |MyTable t1 JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  t1.proctime > t2.proctime - INTERVAL '5' SECOND AND
+        |  t1.proctime < t2.c + INTERVAL '5' SECOND""".stripMargin
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** The time conditions should be an And condition **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinNotCnfCondition() = {
+    val sql =
+      """
+        |SELECT t2.a
+        |FROM MyTable t1 JOIN MyTable2 t2 ON
+        |  t1.a = t2.a AND
+        |  (t1.proctime > t2.proctime - INTERVAL '5' SECOND OR
+        |   t1.proctime < t2.c + INTERVAL '5' SECOND)""".stripMargin
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** Validates that no rowtime attribute is in the output schema **/
+  @Test(expected = classOf[TableException])
+  def testNoRowtimeAttributeInResult(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND t2.proctime
+        | """.stripMargin
+
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 7885160..90c8ea4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -85,7 +85,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
       "DataStreamCalc",
       streamTableNode(0),
       term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"),
-      term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)")
+      term("where", ">(rowtime, 1990-12-02 12:11:11)")
     )
 
     util.verifyTable(result, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index c008ed3..6c24c5d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -20,20 +20,18 @@ package org.apache.flink.table.runtime.harness
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.lang.{Integer => JInt}
 
-import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
-import org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, TwoInputStreamOperatorTestHarness}
-import org.apache.flink.table.codegen.GeneratedFunction
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
 import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, TupleRowKeySelector}
 import org.apache.flink.table.runtime.join.ProcTimeWindowInnerJoin
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 import org.junit.Test
-
+import org.junit.Assert.{assertEquals, assertTrue}
 
 class JoinHarnessTest extends HarnessTestBase{
 
@@ -89,7 +87,7 @@ class JoinHarnessTest extends HarnessTestBase{
        new TupleRowKeySelector[Integer](0),
        new TupleRowKeySelector[Integer](0),
        BasicTypeInfo.INT_TYPE_INFO,
-       1,1,0)
+       1, 1, 0)
 
     testHarness.open()
 
@@ -97,16 +95,16 @@ class JoinHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1)
     testHarness.processElement1(new StreamRecord(
       CRow(Row.of(1: JInt, "aaa"), true), 1))
-    assert(testHarness.numProcessingTimeTimers() == 1)
+    assertEquals(1, testHarness.numProcessingTimeTimers())
     testHarness.setProcessingTime(2)
     testHarness.processElement1(new StreamRecord(
       CRow(Row.of(2: JInt, "bbb"), true), 2))
-    assert(testHarness.numProcessingTimeTimers() == 2)
+    assertEquals(2, testHarness.numProcessingTimeTimers())
     testHarness.setProcessingTime(3)
     testHarness.processElement1(new StreamRecord(
       CRow(Row.of(1: JInt, "aaa2"), true), 3))
-    assert(testHarness.numKeyedStateEntries() == 4)
-    assert(testHarness.numProcessingTimeTimers() == 2)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+    assertEquals(2, testHarness.numProcessingTimeTimers())
 
     // right stream input and output normally
     testHarness.processElement2(new StreamRecord(
@@ -114,20 +112,20 @@ class JoinHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(4)
     testHarness.processElement2(new StreamRecord(
       CRow(Row.of(2: JInt, "Hello1"), true), 4))
-    assert(testHarness.numKeyedStateEntries() == 8)
-    assert(testHarness.numProcessingTimeTimers() == 4)
+    assertEquals(8, testHarness.numKeyedStateEntries())
+    assertEquals(4, testHarness.numProcessingTimeTimers())
 
     // expired left stream record at timestamp 1
     testHarness.setProcessingTime(12)
-    assert(testHarness.numKeyedStateEntries() == 8)
-    assert(testHarness.numProcessingTimeTimers() == 4)
+    assertEquals(8, testHarness.numKeyedStateEntries())
+    assertEquals(4, testHarness.numProcessingTimeTimers())
     testHarness.processElement2(new StreamRecord(
       CRow(Row.of(1: JInt, "Hi2"), true), 12))
 
     // expired right stream record at timestamp 4 and all left stream
     testHarness.setProcessingTime(25)
-    assert(testHarness.numKeyedStateEntries() == 2)
-    assert(testHarness.numProcessingTimeTimers() == 1)
+    assertEquals(2, testHarness.numKeyedStateEntries())
+    assertEquals(1, testHarness.numProcessingTimeTimers())
     testHarness.processElement1(new StreamRecord(
       CRow(Row.of(1: JInt, "aaa3"), true), 25))
     testHarness.processElement1(new StreamRecord(
@@ -136,9 +134,9 @@ class JoinHarnessTest extends HarnessTestBase{
       CRow(Row.of(2: JInt, "Hello2"), true), 25))
 
     testHarness.setProcessingTime(45)
-    assert(testHarness.numKeyedStateEntries() > 0)
+    assertTrue(testHarness.numKeyedStateEntries() > 0)
     testHarness.setProcessingTime(46)
-    assert(testHarness.numKeyedStateEntries() == 0)
+    assertEquals(0, testHarness.numKeyedStateEntries())
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -175,7 +173,7 @@ class JoinHarnessTest extends HarnessTestBase{
         new TupleRowKeySelector[Integer](0),
         new TupleRowKeySelector[Integer](0),
         BasicTypeInfo.INT_TYPE_INFO,
-        1,1,0)
+        1, 1, 0)
 
     testHarness.open()
 
@@ -188,37 +186,38 @@ class JoinHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(3)
     testHarness.processElement1(new StreamRecord(
       CRow(Row.of(1: JInt, "aaa3"), true), 3))
-    assert(testHarness.numKeyedStateEntries() == 4)
-    assert(testHarness.numProcessingTimeTimers() == 2)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+    assertEquals(2, testHarness.numProcessingTimeTimers())
 
     // Do not store b elements
     // not meet a.proctime <= b.proctime - 5
     testHarness.processElement2(new StreamRecord(
       CRow(Row.of(1: JInt, "bbb3"), true), 3))
-    assert(testHarness.numKeyedStateEntries() == 4)
-    assert(testHarness.numProcessingTimeTimers() == 2)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+    assertEquals(2, testHarness.numProcessingTimeTimers())
 
     // meet a.proctime <= b.proctime - 5
     testHarness.setProcessingTime(7)
     testHarness.processElement2(new StreamRecord(
       CRow(Row.of(2: JInt, "bbb7"), true), 7))
-    assert(testHarness.numKeyedStateEntries() == 4)
-    assert(testHarness.numProcessingTimeTimers() == 2)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+    assertEquals(2, testHarness.numProcessingTimeTimers())
 
     // expire record of stream a at timestamp 1
     testHarness.setProcessingTime(12)
-    assert(testHarness.numKeyedStateEntries() == 4)
-    assert(testHarness.numProcessingTimeTimers() == 2)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+    assertEquals(2, testHarness.numProcessingTimeTimers())
     testHarness.processElement2(new StreamRecord(
       CRow(Row.of(1: JInt, "bbb12"), true), 12))
 
     testHarness.setProcessingTime(13)
-    assert(testHarness.numKeyedStateEntries() == 2)
-    assert(testHarness.numProcessingTimeTimers() == 1)
+    assertEquals(2, testHarness.numKeyedStateEntries())
+    assertEquals(1, testHarness.numProcessingTimeTimers())
 
-    testHarness.setProcessingTime(14)
-    assert(testHarness.numKeyedStateEntries() == 0)
-    assert(testHarness.numProcessingTimeTimers() == 0)
+    // state must be cleaned after the window timer interval has passed without new rows.
+    testHarness.setProcessingTime(23)
+    assertEquals(0, testHarness.numKeyedStateEntries())
+    assertEquals(0, testHarness.numProcessingTimeTimers())
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
new file mode 100644
index 0000000..ab7925b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+  /** test process time inner join **/
+  @Test
+  def testProcessTimeInnerJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
+      "t1.proctime between t2.proctime - interval '5' second and t2.proctime + interval '5' second"
+
+    val data1 = new mutable.MutableList[(Int, Long, String)]
+    data1.+=((1, 1L, "Hi1"))
+    data1.+=((1, 2L, "Hi2"))
+    data1.+=((1, 5L, "Hi3"))
+    data1.+=((2, 7L, "Hi5"))
+    data1.+=((1, 9L, "Hi6"))
+    data1.+=((1, 8L, "Hi8"))
+
+    val data2 = new mutable.MutableList[(Int, Long, String)]
+    data2.+=((1, 1L, "HiHi"))
+    data2.+=((2, 2L, "HeHe"))
+
+    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+  }
+
+  /** test process time inner join with other condition **/
+  @Test
+  def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
+      "t1.proctime between t2.proctime - interval '5' second " +
+      "and t2.proctime + interval '5' second " +
+      "and t1.b > t2.b and t1.b + t2.b < 14"
+
+    val data1 = new mutable.MutableList[(String, Long, String)]
+    data1.+=(("1", 1L, "Hi1"))
+    data1.+=(("1", 2L, "Hi2"))
+    data1.+=(("1", 5L, "Hi3"))
+    data1.+=(("2", 7L, "Hi5"))
+    data1.+=(("1", 9L, "Hi6"))
+    data1.+=(("1", 8L, "Hi8"))
+
+    val data2 = new mutable.MutableList[(String, Long, String)]
+    data2.+=(("1", 5L, "HiHi"))
+    data2.+=(("2", 2L, "HeHe"))
+
+    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+  }
+
+}
+