You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/07/18 07:40:08 UTC

[3/3] flink git commit: [FLINK-6232] [table] Add SQL documentation for time window join.

[FLINK-6232] [table] Add SQL documentation for time window join.

- Add support for window join predicates in WHERE clause.
- Refactoring of WindowJoinUtil.
- Minor refactorings of join classes.

This closes #4324.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/471345c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/471345c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/471345c0

Branch: refs/heads/master
Commit: 471345c0ea9930a582786cc08bd290d374b28c5a
Parents: ba6c59e
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Jul 13 00:49:30 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Jul 18 00:57:37 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |  29 ++
 .../table/api/StreamTableEnvironment.scala      |  10 +-
 .../calcite/RelTimeIndicatorConverter.scala     | 166 ++++----
 .../flink/table/plan/nodes/CommonCalc.scala     |  25 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  12 +-
 .../table/plan/nodes/dataset/DataSetJoin.scala  |   2 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |  25 +-
 .../nodes/datastream/DataStreamWindowJoin.scala |  40 +-
 .../datastream/DataStreamWindowJoinRule.scala   |  55 ++-
 .../table/plan/util/UpdatingPlanChecker.scala   | 129 ++++++
 .../runtime/join/ProcTimeWindowInnerJoin.scala  | 134 +++---
 .../table/runtime/join/WindowJoinUtil.scala     | 408 ++++++++++++-------
 .../table/updateutils/UpdateCheckUtils.scala    | 128 ------
 .../table/api/scala/stream/sql/JoinITCase.scala | 117 ------
 .../table/api/scala/stream/sql/JoinTest.scala   | 235 -----------
 .../flink/table/api/stream/sql/JoinTest.scala   | 250 ++++++++++++
 .../sql/validation/JoinValidationTest.scala     |  95 +++++
 .../plan/TimeIndicatorConversionTest.scala      |   2 +-
 .../table/runtime/harness/JoinHarnessTest.scala |  61 ++-
 .../table/runtime/stream/sql/JoinITCase.scala   | 106 +++++
 20 files changed, 1167 insertions(+), 862 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 81aa3d9..e677b60 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -369,6 +369,35 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
       </td>
     </tr>
     <tr>
+      <td><strong>Time-windowed Join</strong><br>
+        <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
+
+        <p>A time-windowed join requires a special join condition that bounds the time on both sides. This can be done by either two appropriate range predicates (<code> &lt;, &lt;=, &gt;=, &gt;</code>) or a <code>BETWEEN</code> predicate that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+          <ul>
+            <li>Time predicates must compare time attributes of both input tables.</li>
+            <li>Time predicates must compare only time attributes of the same type, i.e., processing time with processing time or event time with event time.</li>
+            <li>Only range predicates are valid time predicates.</li>
+            <li>Non-time predicates must not access a time attribute.</li>
+          </ul>
+        </p>
+
+        <p><b>Note:</b> Currently, only processing time window joins and <code>INNER</code> joins are supported.</p>
+
+{% highlight sql %}
+SELECT *
+FROM Orders o, Shipments s
+WHERE o.id = s.orderId AND
+      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
+{% endhighlight %}
+
+The example above will join all orders with their corresponding shipments if the order was shipped four hours after the order was received.
+      </td>
+    </tr>
+    <tr>
     	<td>
         <strong>Expanding arrays into a relation</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index f026824..669b017 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -45,12 +45,12 @@ import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait, _}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable}
+import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner}
 import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
 import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.TypeCheckUtils
-import org.apache.flink.table.updateutils.UpdateCheckUtils
 import org.apache.flink.types.Row
 
 import _root_.scala.collection.JavaConverters._
@@ -174,10 +174,10 @@ abstract class StreamTableEnvironment(
         // optimize plan
         val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
         // check for append only table
-        val isAppendOnlyTable = UpdateCheckUtils.isAppendOnly(optimizedPlan)
+        val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(optimizedPlan)
         upsertSink.setIsAppendOnly(isAppendOnlyTable)
         // extract unique key fields
-        val tableKeys: Option[Array[String]] = UpdateCheckUtils.getUniqueKeyFields(optimizedPlan)
+        val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan)
         // check that we have keys if the table has changes (is not append-only)
         tableKeys match {
           case Some(keys) => upsertSink.setKeyFields(keys)
@@ -201,7 +201,7 @@ abstract class StreamTableEnvironment(
         // optimize plan
         val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
         // verify table is an insert-only (append-only) table
-        if (!UpdateCheckUtils.isAppendOnly(optimizedPlan)) {
+        if (!UpdatingPlanChecker.isAppendOnly(optimizedPlan)) {
           throw new TableException(
             "AppendStreamTableSink requires that Table has only insert changes.")
         }
@@ -651,7 +651,7 @@ abstract class StreamTableEnvironment(
       (implicit tpe: TypeInformation[A]): DataStream[A] = {
 
     // if no change flags are requested, verify table is an insert-only (append-only) table.
-    if (!withChangeFlag && !UpdateCheckUtils.isAppendOnly(logicalPlan)) {
+    if (!withChangeFlag && !UpdatingPlanChecker.isAppendOnly(logicalPlan)) {
       throw new TableException(
         "Table is not an append-only table. " +
         "Use the toRetractStream() in order to handle add and retract messages.")

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 32d6f01..d76613e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -42,9 +42,9 @@ import scala.collection.mutable
 class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
 
   private val timestamp = rexBuilder
-      .getTypeFactory
-      .asInstanceOf[FlinkTypeFactory]
-      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+    .getTypeFactory
+    .asInstanceOf[FlinkTypeFactory]
+    .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
 
   override def visit(intersect: LogicalIntersect): RelNode =
     throw new TableException("Logical intersect in a stream environment is not supported yet.")
@@ -138,15 +138,10 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
     // visit children and update inputs
     val input = filter.getInput.accept(this)
 
-    // check if input field contains time indicator type
-    // materialize field if no time indicator is present anymore
-    // if input field is already materialized, change to timestamp type
-    val materializer = new RexTimeIndicatorMaterializer(
-      rexBuilder,
-      input.getRowType.getFieldList.map(_.getType))
-
-    val condition = filter.getCondition.accept(materializer)
-    LogicalFilter.create(input, condition)
+    // We do not materialize time indicators in conditions because they can be locally evaluated.
+    // Some conditions are evaluated by special operators (e.g., time window joins).
+    // Time indicators in remaining conditions are materialized by Calc before the code generation.
+    LogicalFilter.create(input, filter.getCondition)
   }
 
   override def visit(project: LogicalProject): RelNode = {
@@ -306,68 +301,6 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
       updatedAggCalls)
   }
 
-  class RexTimeIndicatorMaterializer(
-      private val rexBuilder: RexBuilder,
-      private val input: Seq[RelDataType])
-    extends RexShuttle {
-
-    override def visitInputRef(inputRef: RexInputRef): RexNode = {
-      // reference is interesting
-      if (isTimeIndicatorType(inputRef.getType)) {
-        val resolvedRefType = input(inputRef.getIndex)
-        // input is a valid time indicator
-        if (isTimeIndicatorType(resolvedRefType)) {
-          inputRef
-        }
-        // input has been materialized
-        else {
-          new RexInputRef(inputRef.getIndex, resolvedRefType)
-        }
-      }
-      // reference is a regular field
-      else {
-        super.visitInputRef(inputRef)
-      }
-    }
-
-    override def visitCall(call: RexCall): RexNode = {
-      val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
-
-      // materialize operands with time indicators
-      val materializedOperands = updatedCall.getOperator match {
-
-        // skip materialization for special operators
-        case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
-          updatedCall.getOperands.toList
-
-        case _ =>
-          updatedCall.getOperands.map { o =>
-            if (isTimeIndicatorType(o.getType)) {
-              rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
-            } else {
-              o
-            }
-          }
-      }
-
-      // remove time indicator return type
-      updatedCall.getOperator match {
-
-        // we do not modify AS if operand has not been materialized
-        case SqlStdOperatorTable.AS if
-            isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
-          updatedCall
-
-        // materialize function's result and operands
-        case _ if isTimeIndicatorType(updatedCall.getType) =>
-          updatedCall.clone(timestamp, materializedOperands)
-
-        // materialize function's operands only
-        case _ =>
-          updatedCall.clone(updatedCall.getType, materializedOperands)
-      }
-    }
-  }
 }
 
 object RelTimeIndicatorConverter {
@@ -421,4 +354,89 @@ object RelTimeIndicatorConverter {
 
     new RelRecordType(fields)
   }
+
+  /**
+    * Materializes time indicator accesses in an expression.
+    *
+    * @param expr The expression in which time indicators are materialized.
+    * @param rowType The input schema of the expression.
+    * @param rexBuilder A RexBuilder.
+    *
+    * @return The expression with materialized time indicators.
+    */
+  def convertExpression(expr: RexNode, rowType: RelDataType, rexBuilder: RexBuilder): RexNode = {
+    val materializer = new RexTimeIndicatorMaterializer(
+          rexBuilder,
+          rowType.getFieldList.map(_.getType))
+
+        expr.accept(materializer)
+  }
+}
+
+class RexTimeIndicatorMaterializer(
+  private val rexBuilder: RexBuilder,
+  private val input: Seq[RelDataType])
+  extends RexShuttle {
+
+  private val timestamp = rexBuilder
+    .getTypeFactory
+    .asInstanceOf[FlinkTypeFactory]
+    .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+
+  override def visitInputRef(inputRef: RexInputRef): RexNode = {
+    // reference is interesting
+    if (isTimeIndicatorType(inputRef.getType)) {
+      val resolvedRefType = input(inputRef.getIndex)
+      // input is a valid time indicator
+      if (isTimeIndicatorType(resolvedRefType)) {
+        inputRef
+      }
+      // input has been materialized
+      else {
+        new RexInputRef(inputRef.getIndex, resolvedRefType)
+      }
+    }
+    // reference is a regular field
+    else {
+      super.visitInputRef(inputRef)
+    }
+  }
+
+  override def visitCall(call: RexCall): RexNode = {
+    val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
+
+    // materialize operands with time indicators
+    val materializedOperands = updatedCall.getOperator match {
+
+      // skip materialization for special operators
+      case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
+        updatedCall.getOperands.toList
+
+      case _ =>
+        updatedCall.getOperands.map { o =>
+          if (isTimeIndicatorType(o.getType)) {
+            rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
+          } else {
+            o
+          }
+        }
+    }
+
+    // remove time indicator return type
+    updatedCall.getOperator match {
+
+      // we do not modify AS if operand has not been materialized
+      case SqlStdOperatorTable.AS if
+      isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
+        updatedCall
+
+      // materialize function's result and operands
+      case _ if isTimeIndicatorType(updatedCall.getType) =>
+        updatedCall.clone(timestamp, materializedOperands)
+
+      // materialize function's operands only
+      case _ =>
+        updatedCall.clone(updatedCall.getType, materializedOperands)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 693924e..3e355ff 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -22,12 +22,10 @@ import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
 import org.apache.calcite.rex._
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
 
-import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
 trait CommonCalc {
@@ -37,39 +35,26 @@ trait CommonCalc {
       ruleDescription: String,
       inputSchema: RowSchema,
       returnSchema: RowSchema,
-      calcProgram: RexProgram,
+      calcProjection: Seq[RexNode],
+      calcCondition: Option[RexNode],
       config: TableConfig,
       functionClass: Class[T]):
     GeneratedFunction[T, Row] = {
 
-    val expandedExpressions = calcProgram
-      .getProjectList
-      .map(expr => calcProgram.expandLocalRef(expr))
-      // time indicator fields must not be part of the code generation
-      .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType))
-      // update indices
-      .map(expr => inputSchema.mapRexNode(expr))
-
-    val condition = if (calcProgram.getCondition != null) {
-      inputSchema.mapRexNode(calcProgram.expandLocalRef(calcProgram.getCondition))
-    } else {
-      null
-    }
-
     val projection = generator.generateResultExpression(
       returnSchema.physicalTypeInfo,
       returnSchema.physicalFieldNames,
-      expandedExpressions)
+      calcProjection)
 
     // only projection
-    val body = if (condition == null) {
+    val body = if (calcCondition.isEmpty) {
       s"""
         |${projection.code}
         |${generator.collectorTerm}.collect(${projection.resultTerm});
         |""".stripMargin
     }
     else {
-      val filterCondition = generator.generateExpression(condition)
+      val filterCondition = generator.generateExpression(calcCondition.get)
       // only filter
       if (projection == null) {
         s"""

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index a923acc..7417eb2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -35,6 +35,8 @@ import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.FlatMapRunner
 import org.apache.flink.types.Row
 
+import scala.collection.JavaConverters._
+
 /**
   * Flink RelNode which matches along with LogicalCalc.
   *
@@ -90,12 +92,20 @@ class DataSetCalc(
 
     val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
+    val projection = calcProgram.getProjectList.asScala.map(calcProgram.expandLocalRef)
+    val condition = if (calcProgram.getCondition != null) {
+      Some(calcProgram.expandLocalRef(calcProgram.getCondition))
+    } else {
+      None
+    }
+
     val genFunction = generateFunction(
       generator,
       ruleDescription,
       new RowSchema(getInput.getRowType),
       new RowSchema(getRowType),
-      calcProgram,
+      projection,
+      condition,
       config,
       classOf[FlatMapFunction[Row, Row]])
 

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index e8f3b82..1583e31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -58,7 +58,7 @@ class DataSetJoin(
   with CommonJoin
   with DataSetRel {
 
-  override def deriveRowType() = rowRelDataType
+  override def deriveRowType(): RelDataType = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataSetJoin(

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index d626c46..2e00330 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -28,11 +28,14 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.codegen.FunctionCodeGenerator
+import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
 import org.apache.flink.table.plan.nodes.CommonCalc
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.CRowProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
+import scala.collection.JavaConverters._
+
 /**
   * Flink RelNode which matches along with FlatMapOperator.
   *
@@ -93,6 +96,25 @@ class DataStreamCalc(
     val inputDataStream =
       getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
 
+    // materialize time attributes in condition
+    val condition = if (calcProgram.getCondition != null) {
+      val materializedCondition = RelTimeIndicatorConverter.convertExpression(
+        calcProgram.expandLocalRef(calcProgram.getCondition),
+        inputSchema.logicalType,
+        cluster.getRexBuilder)
+      Some(materializedCondition)
+    } else {
+      None
+    }
+
+    // filter out time attributes
+    val projection = calcProgram.getProjectList.asScala
+      .map(calcProgram.expandLocalRef)
+      // time indicator fields must not be part of the code generation
+      .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType))
+      // update indices
+      .map(expr => inputSchema.mapRexNode(expr))
+
     val generator = new FunctionCodeGenerator(config, false, inputSchema.physicalTypeInfo)
 
     val genFunction = generateFunction(
@@ -100,7 +122,8 @@ class DataStreamCalc(
       ruleDescription,
       inputSchema,
       schema,
-      calcProgram,
+      projection,
+      condition,
       config,
       classOf[ProcessFunction[CRow, CRow]])
 

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
index 1315a79..987947c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.calcite.rex.RexNode
@@ -27,12 +28,12 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
 import org.apache.flink.table.plan.nodes.CommonJoin
 import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, WindowJoinUtil}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.table.updateutils.UpdateCheckUtils
 
 /**
-  * Flink RelNode which matches along with JoinOperator and its related operations.
+  * RelNode for a time windowed stream join.
   */
 class DataStreamWindowJoin(
     cluster: RelOptCluster,
@@ -53,7 +54,7 @@ class DataStreamWindowJoin(
     with CommonJoin
     with DataStreamRel {
 
-  override def deriveRowType() = schema.logicalType
+  override def deriveRowType(): RelDataType = schema.logicalType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamWindowJoin(
@@ -96,8 +97,8 @@ class DataStreamWindowJoin(
 
     val config = tableEnv.getConfig
 
-    val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
-    val isRightAppendOnly = UpdateCheckUtils.isAppendOnly(right)
+    val isLeftAppendOnly = UpdatingPlanChecker.isAppendOnly(left)
+    val isRightAppendOnly = UpdatingPlanChecker.isAppendOnly(right)
     if (!isLeftAppendOnly || !isRightAppendOnly) {
       throw new TableException(
         "Windowed stream join does not support updates.")
@@ -124,21 +125,20 @@ class DataStreamWindowJoin(
 
     joinType match {
       case JoinRelType.INNER =>
-        isRowTime match {
-          case false =>
-            // Proctime JoinCoProcessFunction
-            createProcTimeInnerJoinFunction(
-              leftDataStream,
-              rightDataStream,
-              joinFunction.name,
-              joinFunction.code,
-              leftKeys,
-              rightKeys
-            )
-          case true =>
-            // RowTime JoinCoProcessFunction
-            throw new TableException(
-              "RowTime inner join between stream and stream is not supported yet.")
+        if (isRowTime) {
+          // RowTime JoinCoProcessFunction
+          throw new TableException(
+            "RowTime inner join between stream and stream is not supported yet.")
+        } else {
+          // Proctime JoinCoProcessFunction
+          createProcTimeInnerJoinFunction(
+            leftDataStream,
+            rightDataStream,
+            joinFunction.name,
+            joinFunction.code,
+            leftKeys,
+            rightKeys
+          )
         }
       case JoinRelType.FULL =>
         throw new TableException(

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
index c7a190f..2075689 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
@@ -21,13 +21,16 @@ package org.apache.flink.table.plan.rules.datastream
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamWindowJoin
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.join.WindowJoinUtil
 
+import scala.collection.JavaConverters._
+
 class DataStreamWindowJoinRule
   extends ConverterRule(
     classOf[FlinkLogicalJoin],
@@ -39,18 +42,36 @@ class DataStreamWindowJoinRule
     val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
     val joinInfo = join.analyzeCondition
 
-    try {
-      WindowJoinUtil.analyzeTimeBoundary(
-        joinInfo.getRemaining(join.getCluster.getRexBuilder),
-        join.getLeft.getRowType.getFieldCount,
-        new RowSchema(join.getRowType),
-        join.getCluster.getRexBuilder,
-        TableConfig.DEFAULT)
-      true
-    } catch {
-      case _: TableException =>
+    val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate(
+      joinInfo.getRemaining(join.getCluster.getRexBuilder),
+      join.getLeft.getRowType.getFieldCount,
+      join.getRowType,
+      join.getCluster.getRexBuilder,
+      TableConfig.DEFAULT)
+
+    // remaining predicate must not access time attributes
+    val remainingPredsAccessTime = remainingPreds.isDefined &&
+      WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, join.getRowType)
+
+    if (windowBounds.isDefined) {
+      if (windowBounds.get.isEventTime) {
+        // we cannot handle event-time window joins yet
         false
+      } else {
+        // Check that no event-time attributes are in the input.
+        // The proc-time join implementation does ensure that record timestamp are correctly set.
+        // It is always the timestamp of the later arriving record.
+        // We rely on projection pushdown to remove unused attributes before the join.
+        val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
+          .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+        !remainingPredsAccessTime && !rowTimeAttrInOutput
+      }
+    } else {
+      // the given join does not have valid window bounds. We cannot translate it.
+      false
     }
+
   }
 
   override def convert(rel: RelNode): RelNode = {
@@ -63,11 +84,11 @@ class DataStreamWindowJoinRule
     val leftRowSchema = new RowSchema(convLeft.getRowType)
     val rightRowSchema = new RowSchema(convRight.getRowType)
 
-    val (isRowTime, leftLowerBoundary, leftUpperBoundary, remainCondition) =
-      WindowJoinUtil.analyzeTimeBoundary(
+    val (windowBounds, remainCondition) =
+      WindowJoinUtil.extractWindowBoundsFromPredicate(
         joinInfo.getRemaining(join.getCluster.getRexBuilder),
         leftRowSchema.logicalArity,
-        new RowSchema(join.getRowType),
+        join.getRowType,
         join.getCluster.getRexBuilder,
         TableConfig.DEFAULT)
 
@@ -81,9 +102,9 @@ class DataStreamWindowJoinRule
       leftRowSchema,
       rightRowSchema,
       new RowSchema(rel.getRowType),
-      isRowTime,
-      leftLowerBoundary,
-      leftUpperBoundary,
+      windowBounds.get.isEventTime,
+      windowBounds.get.leftLowerBound,
+      windowBounds.get.leftUpperBound,
       remainCondition,
       description)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
new file mode 100644
index 0000000..6a160f6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rel.{RelNode, RelVisitor}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.datastream._
+
+import _root_.scala.collection.JavaConverters._
+
+object UpdatingPlanChecker {
+
+  /** Validates that the plan produces only append changes. */
+  def isAppendOnly(plan: RelNode): Boolean = {
+    val appendOnlyValidator = new AppendOnlyValidator
+    appendOnlyValidator.go(plan)
+
+    appendOnlyValidator.isAppendOnly
+  }
+
+  /** Extracts the unique keys of the table produced by the plan. */
+  def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
+    val keyExtractor = new UniqueKeyExtractor
+    keyExtractor.go(plan)
+    keyExtractor.keys
+  }
+
+  private class AppendOnlyValidator extends RelVisitor {
+
+    var isAppendOnly = true
+
+    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+      node match {
+        case s: DataStreamRel if s.producesUpdates =>
+          isAppendOnly = false
+        case _ =>
+          super.visit(node, ordinal, parent)
+      }
+    }
+  }
+
+  /** Identifies unique key fields in the output of a RelNode. */
+  private class UniqueKeyExtractor extends RelVisitor {
+
+    var keys: Option[Array[String]] = None
+
+    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+      node match {
+        case c: DataStreamCalc =>
+          super.visit(node, ordinal, parent)
+          // check if input has keys
+          if (keys.isDefined) {
+            // track keys forward
+            val inNames = c.getInput.getRowType.getFieldNames
+            val inOutNames = c.getProgram.getNamedProjects.asScala
+              .map(p => {
+                c.getProgram.expandLocalRef(p.left) match {
+                  // output field is forwarded input field
+                  case i: RexInputRef => (i.getIndex, p.right)
+                  // output field is renamed input field
+                  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+                    a.getOperands.get(0) match {
+                      case ref: RexInputRef =>
+                        (ref.getIndex, p.right)
+                      case _ =>
+                        (-1, p.right)
+                    }
+                  // output field is not forwarded from input
+                  case _: RexNode => (-1, p.right)
+                }
+              })
+              // filter all non-forwarded fields
+              .filter(_._1 >= 0)
+              // resolve names of input fields
+              .map(io => (inNames.get(io._1), io._2))
+
+            // filter by input keys
+            val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
+            // check if all keys have been preserved
+            if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
+              // all key have been preserved (but possibly renamed)
+              keys = Some(outKeys.toArray)
+            } else {
+              // some (or all) keys have been removed. Keys are no longer unique and removed
+              keys = None
+            }
+          }
+        case _: DataStreamOverAggregate =>
+          super.visit(node, ordinal, parent)
+        // keys are always forwarded by Over aggregate
+        case a: DataStreamGroupAggregate =>
+          // get grouping keys
+          val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
+          keys = Some(groupKeys.toArray)
+        case w: DataStreamGroupWindowAggregate =>
+          // get grouping keys
+          val groupKeys =
+            w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
+          // get window start and end time
+          val windowStartEnd = w.getWindowProperties.map(_.name)
+          // we have only a unique key if at least one window property is selected
+          if (windowStartEnd.nonEmpty) {
+            keys = Some(groupKeys ++ windowStartEnd)
+          }
+        case _: DataStreamRel =>
+          // anything else does not forward keys or might duplicate key, so we can stop
+          keys = None
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
index 97d5ccc..8a3ba43 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
@@ -59,24 +59,26 @@ class ProcTimeWindowInnerJoin(
 
   private var cRowWrapper: CRowWrappingCollector = _
 
-  /** other condition function **/
+  // other condition function
   private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
 
-  /** tmp list to store expired records **/
-  private var listToRemove: JList[Long] = _
+  // tmp list to store expired records
+  private var removeList: JList[Long] = _
 
-  /** state to hold left stream element **/
+  // state to hold left stream element
   private var row1MapState: MapState[Long, JList[Row]] = _
-  /** state to hold right stream element **/
+  // state to hold right stream element
   private var row2MapState: MapState[Long, JList[Row]] = _
 
-  /** state to record last timer of left stream, 0 means no timer **/
+  // state to record last timer of left stream, 0 means no timer
   private var timerState1: ValueState[Long] = _
-  /** state to record last timer of right stream, 0 means no timer **/
+  // state to record last timer of right stream, 0 means no timer
   private var timerState2: ValueState[Long] = _
 
-  private val leftStreamWinSize: Long = if (leftLowerBound < 0) -leftLowerBound else 0
-  private val rightStreamWinSize: Long = if (leftUpperBound > 0) leftUpperBound else 0
+  // compute window sizes, i.e., how long to keep rows in state.
+  // window size of -1 means rows do not need to be put into state.
+  private val leftStreamWinSize: Long = if (leftLowerBound <= 0) -leftLowerBound else -1
+  private val rightStreamWinSize: Long = if (leftUpperBound >= 0) leftUpperBound else -1
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 
@@ -90,7 +92,7 @@ class ProcTimeWindowInnerJoin(
     LOG.debug("Instantiating JoinFunction.")
     joinFunction = clazz.newInstance()
 
-    listToRemove = new util.ArrayList[Long]()
+    removeList = new util.ArrayList[Long]()
     cRowWrapper = new CRowWrappingCollector()
     cRowWrapper.setChange(true)
 
@@ -140,7 +142,7 @@ class ProcTimeWindowInnerJoin(
       row2MapState,
       -leftUpperBound,     // right stream lower
       -leftLowerBound,     // right stream upper
-      true
+      isLeft = true
     )
   }
 
@@ -165,9 +167,9 @@ class ProcTimeWindowInnerJoin(
       timerState2,
       row2MapState,
       row1MapState,
-      leftLowerBound,    // left stream upper
+      leftLowerBound,    // left stream lower
       leftUpperBound,    // left stream upper
-      false
+      isLeft = false
     )
   }
 
@@ -217,66 +219,82 @@ class ProcTimeWindowInnerJoin(
       winSize: Long,
       timerState: ValueState[Long],
       rowMapState: MapState[Long, JList[Row]],
-      oppoRowMapState: MapState[Long, JList[Row]],
-      oppoLowerBound: Long,
-      oppoUpperBound: Long,
+      otherRowMapState: MapState[Long, JList[Row]],
+      otherLowerBound: Long,
+      otherUpperBound: Long,
       isLeft: Boolean): Unit = {
 
     cRowWrapper.out = out
 
-    val value = valueC.row
+    val row = valueC.row
 
     val curProcessTime = ctx.timerService.currentProcessingTime
-    val oppoLowerTime = curProcessTime + oppoLowerBound
-    val oppoUpperTime = curProcessTime + oppoUpperBound
-
-    // only when windowsize != 0, we need to store the element
-    if (winSize != 0) {
-      // register a timer to expire the element
-      if (timerState.value == 0) {
-        ctx.timerService.registerProcessingTimeTimer(curProcessTime + winSize + 1)
-        timerState.update(curProcessTime + winSize + 1)
-      }
+    val otherLowerTime = curProcessTime + otherLowerBound
+    val otherUpperTime = curProcessTime + otherUpperBound
 
+    if (winSize >= 0) {
+      // put row into state for later joining.
+      // (winSize == 0) joins rows received in the same millisecond.
       var rowList = rowMapState.get(curProcessTime)
       if (rowList == null) {
         rowList = new util.ArrayList[Row]()
       }
-      rowList.add(value)
+      rowList.add(row)
       rowMapState.put(curProcessTime, rowList)
 
+      // register a timer to remove the row from state once it is expired
+      if (timerState.value == 0) {
+        val cleanupTime = curProcessTime + winSize + 1
+        ctx.timerService.registerProcessingTimeTimer(cleanupTime)
+        timerState.update(cleanupTime)
+      }
     }
 
-    // loop the other stream elements
-    val oppositeKeyIter = oppoRowMapState.keys().iterator()
-    while (oppositeKeyIter.hasNext) {
-      val eleTime = oppositeKeyIter.next()
-      if (eleTime < oppoLowerTime) {
-        listToRemove.add(eleTime)
-      } else if (eleTime <= oppoUpperTime) {
-        val oppoRowList = oppoRowMapState.get(eleTime)
-        var i = 0
-        if (isLeft) {
-          while (i < oppoRowList.size) {
-            joinFunction.join(value, oppoRowList.get(i), cRowWrapper)
+    // join row with rows received from the other input
+    val otherTimeIter = otherRowMapState.keys().iterator()
+    if (isLeft) {
+      // go over all timestamps in the other input's state
+      while (otherTimeIter.hasNext) {
+        val otherTimestamp = otherTimeIter.next()
+        if (otherTimestamp < otherLowerTime) {
+          // other timestamp is expired. Remove it later.
+          removeList.add(otherTimestamp)
+        } else if (otherTimestamp <= otherUpperTime) {
+          // join row with all rows from the other input for this timestamp
+          val otherRows = otherRowMapState.get(otherTimestamp)
+          var i = 0
+          while (i < otherRows.size) {
+            joinFunction.join(row, otherRows.get(i), cRowWrapper)
             i += 1
           }
-        } else {
-          while (i < oppoRowList.size) {
-            joinFunction.join(oppoRowList.get(i), value, cRowWrapper)
+        }
+      }
+    } else {
+      // go over all timestamps in the other input's state
+      while (otherTimeIter.hasNext) {
+        val otherTimestamp = otherTimeIter.next()
+        if (otherTimestamp < otherLowerTime) {
+          // other timestamp is expired. Remove it later.
+          removeList.add(otherTimestamp)
+        } else if (otherTimestamp <= otherUpperTime) {
+          // join row with all rows from the other input for this timestamp
+          val otherRows = otherRowMapState.get(otherTimestamp)
+          var i = 0
+          while (i < otherRows.size) {
+            joinFunction.join(otherRows.get(i), row, cRowWrapper)
             i += 1
           }
         }
       }
     }
 
-    // expire records out-of-time
-    var i = listToRemove.size - 1
+    // remove rows for expired timestamps
+    var i = removeList.size - 1
     while (i >= 0) {
-      oppoRowMapState.remove(listToRemove.get(i))
+      otherRowMapState.remove(removeList.get(i))
       i -= 1
     }
-    listToRemove.clear()
+    removeList.clear()
   }
 
   /**
@@ -292,32 +310,34 @@ class ProcTimeWindowInnerJoin(
 
     val expiredTime = curTime - winSize
     val keyIter = rowMapState.keys().iterator()
-    var nextTimer: Long = 0
+    var validTimestamp: Boolean = false
     // Search for expired timestamps.
     // If we find a non-expired timestamp, remember the timestamp and leave the loop.
     // This way we find all expired timestamps if they are sorted without doing a full pass.
-    while (keyIter.hasNext && nextTimer == 0) {
+    while (keyIter.hasNext && !validTimestamp) {
       val recordTime = keyIter.next
       if (recordTime < expiredTime) {
-        listToRemove.add(recordTime)
+        removeList.add(recordTime)
       } else {
-        nextTimer = recordTime
+        // we found a timestamp that is still valid
+        validTimestamp = true
       }
     }
 
     // Remove expired records from state
-    var i = listToRemove.size - 1
+    var i = removeList.size - 1
     while (i >= 0) {
-      rowMapState.remove(listToRemove.get(i))
+      rowMapState.remove(removeList.get(i))
       i -= 1
     }
-    listToRemove.clear()
+    removeList.clear()
 
     // If the state has non-expired timestamps, register a new timer.
     // Otherwise clean the complete state for this input.
-    if (nextTimer != 0) {
-      ctx.timerService.registerProcessingTimeTimer(nextTimer + winSize + 1)
-      timerState.update(nextTimer + winSize + 1)
+    if (validTimestamp) {
+      val cleanupTime = curTime + winSize + 1
+      ctx.timerService.registerProcessingTimeTimer(cleanupTime)
+      timerState.update(cleanupTime)
     } else {
       timerState.clear()
       rowMapState.clear()

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index fabeeba..c38d915 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.table.runtime.join
 
-import java.math.{BigDecimal => JBigDecimal}
 import java.util
 
 import org.apache.calcite.plan.RelOptUtil
@@ -27,263 +26,362 @@ import org.apache.calcite.rex._
 import org.apache.calcite.sql.SqlKind
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
 import org.apache.flink.table.plan.schema.{RowSchema, TimeIndicatorRelDataType}
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
 /**
-  * An util class to help analyze and build join code .
+  * An util class to optimize and generate window joins.
   */
 object WindowJoinUtil {
 
+  case class WindowBounds(isEventTime: Boolean, leftLowerBound: Long, leftUpperBound: Long)
+
+  protected case class WindowBound(bound: Long, isLeftLower: Boolean)
+  protected case class TimePredicate(
+    isEventTime: Boolean,
+    leftInputOnLeftSide: Boolean,
+    pred: RexCall)
+  protected case class TimeAttributeAccess(isEventTime: Boolean, isLeftInput: Boolean)
+
   /**
-    * Analyze time-condtion to get time boundary for each stream and get the time type
-    * and return remain condition.
+    * Extracts the window bounds from a join predicate.
+    *
+    * @param  predicate           join predicate
+    * @param  leftLogicalFieldCnt number of attributes on the left join input
+    * @param  inputSchema         schema of the join result
+    * @param  rexBuilder          RexBuilder
+    * @param  config              TableConfig
     *
-    * @param  condition           join condition
-    * @param  leftLogicalFieldCnt left stream logical field num
-    * @param  inputSchema         join rowtype schema
-    * @param  rexBuilder          util to build rexNode
-    * @param  config              table environment config
-    * @return isRowTime, left lower boundary, right lower boundary, remain condition
+    * @return A Tuple2 of extracted window bounds and remaining predicates.
     */
-  private[flink] def analyzeTimeBoundary(
-      condition: RexNode,
+  private[flink] def extractWindowBoundsFromPredicate(
+      predicate: RexNode,
       leftLogicalFieldCnt: Int,
-      inputSchema: RowSchema,
+      inputSchema: RelDataType,
       rexBuilder: RexBuilder,
-      config: TableConfig): (Boolean, Long, Long, Option[RexNode]) = {
+      config: TableConfig): (Option[WindowBounds], Option[RexNode]) = {
 
     // Converts the condition to conjunctive normal form (CNF)
-    val cnfCondition = RexUtil.toCnf(rexBuilder, condition)
+    val cnfCondition = RexUtil.toCnf(rexBuilder, predicate)
 
     // split the condition into time indicator condition and other condition
-    val (timeTerms, remainTerms) = cnfCondition match {
-      case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
-        c.getOperands.asScala
-          .map(analyzeCondtionTermType(_, leftLogicalFieldCnt, inputSchema.logicalType))
-          .reduceLeft((l, r) => {
-            (l._1 ++ r._1, l._2 ++ r._2)
-          })
-      case _ =>
-        throw new TableException("A time-based stream join requires exactly " +
-          "two join predicates that bound the time in both directions.")
+    val (timePreds, otherPreds) = cnfCondition match {
+        // We need at least two comparison predicates for a properly bounded window join.
+        // So we need an AND expression for a valid window join.
+        case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+          c.getOperands.asScala
+            .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema))
+            .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => {
+              analyzed match {
+                case Left(timePred) => (preds._1 :+ timePred, preds._2)
+                case Right(otherPred) => (preds._1, preds._2 :+ otherPred)
+              }
+            })
+        case _ =>
+          // No valid window bounds. A windowed stream join requires two comparison predicates that
+          // bound the time in both directions.
+          return (None, Some(predicate))
     }
 
-    if (timeTerms.size != 2) {
-      throw new TableException("A time-based stream join requires exactly " +
-        "two join predicates that bound the time in both directions.")
+    if (timePreds.size != 2) {
+      // No valid window bounds. A windowed stream join requires two comparison predicates that
+      // bound the time in both directions.
+      return (None, Some(predicate))
     }
 
-    // extract time offset from the time indicator conditon
-    val streamTimeOffsets =
-    timeTerms.map(x => extractTimeOffsetFromCondition(x._3, x._2, rexBuilder, config))
-
+    // assemble window bounds from predicates
+    val streamTimeOffsets = timePreds.map(computeWindowBoundFromPredicate(_, rexBuilder, config))
     val (leftLowerBound, leftUpperBound) =
       streamTimeOffsets match {
-        case Seq((x, true), (y, false)) => (x, y)
-        case Seq((x, false), (y, true)) => (y, x)
+        case Seq(Some(x: WindowBound), Some(y: WindowBound)) if x.isLeftLower && !y.isLeftLower =>
+          (x.bound, y.bound)
+        case Seq(Some(x: WindowBound), Some(y: WindowBound)) if !x.isLeftLower && y.isLeftLower =>
+          (y.bound, x.bound)
         case _ =>
-          throw new TableException(
-            "Time-based join conditions must reference the time attribute of both input tables.")
+          // Window join requires two comparison predicate that bound the time in both directions.
+          return (None, Some(predicate))
       }
 
     // compose the remain condition list into one condition
     val remainCondition =
-    remainTerms match {
-      case Seq() => None
+    otherPreds match {
+      case Seq() =>
+        None
       case _ =>
-        // Converts logical field references to physical ones.
-        Some(remainTerms.map(inputSchema.mapRexNode).reduceLeft((l, r) => {
-          RelOptUtil.andJoinFilters(rexBuilder, l, r)
-        }))
+        Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r)))
     }
 
-    val isRowTime: Boolean = timeTerms(0)._1 match {
-      case x if FlinkTypeFactory.isProctimeIndicatorType(x) => false
-      case _ => true
-    }
-    (isRowTime, leftLowerBound, leftUpperBound, remainCondition)
+    val bounds = Some(WindowBounds(timePreds.head.isEventTime, leftLowerBound, leftUpperBound))
+
+    (bounds, remainCondition)
   }
 
   /**
-    * Split the join conditions into time condition and non-time condition
+    * Analyzes a predicate and identifies whether it is a valid predicate for a window join.
+    * A valid window join predicate is a comparison predicate (<, <=, =>, >) that accesses
+    * time attributes of both inputs, each input on a different side of the condition.
+    * Both accessed time attributes must be of the same time type, i.e., row-time or proc-time.
+    *
+    * Examples:
+    * - left.rowtime > right.rowtime + 2.minutes => valid
+    * - left.proctime < right.rowtime + 2.minutes => invalid: different time type
+    * - left.rowtime == right.rowtime + 2.minutes => invalid: not a comparison predicate
+    * - left.rowtime - right.rowtime < 2.minutes => invalid: both time attributes on same side
     *
-    * @return (Seq(timeTerms), Seq(remainTerms)),
+    * If the predicate is a regular join predicate, i.e., it accesses no time attribute it is
+    * returned as well.
+    *
+    * @return Either a valid time predicate (Left) or a valid non-time predicate (Right)
     */
-  private def analyzeCondtionTermType(
-      conditionTerm: RexNode,
+  private def identifyTimePredicate(
+      pred: RexNode,
       leftFieldCount: Int,
-      inputType: RelDataType): (Seq[(RelDataType, Boolean, RexNode)], Seq[RexNode]) = {
-
-    conditionTerm match {
-      case c: RexCall if Seq(SqlKind.GREATER_THAN, SqlKind.GREATER_THAN_OR_EQUAL,
-        SqlKind.LESS_THAN, SqlKind.LESS_THAN_OR_EQUAL).contains(c.getKind) =>
-        val timeIndicators = extractTimeIndicatorAccesses(c, leftFieldCount, inputType)
-        timeIndicators match {
-          case Seq() =>
-            (Seq(), Seq(c))
-          case Seq(v1, v2) =>
-            if (v1._1 != v2._1) {
-              throw new TableException(
-                "Both time attributes in a join condition must be of the same type.")
+      inputType: RelDataType): Either[TimePredicate, RexNode] = {
+
+    pred match {
+      case c: RexCall =>
+        c.getKind match {
+          case SqlKind.GREATER_THAN |
+               SqlKind.GREATER_THAN_OR_EQUAL |
+               SqlKind.LESS_THAN |
+               SqlKind.LESS_THAN_OR_EQUAL =>
+
+            val leftTerm = c.getOperands.get(0)
+            val rightTerm = c.getOperands.get(1)
+
+            // validate that both sides of the condition do not access non-time attributes
+            if (accessesNonTimeAttribute(leftTerm, inputType) ||
+                accessesNonTimeAttribute(rightTerm, inputType)) {
+              return Right(pred)
             }
-            if (v1._2 == v2._2) {
-              throw new TableException("Time-based join conditions " +
-                "must reference the time attribute of both input tables.")
+
+            // get time attribute on left side of comparison
+            val leftTimeAttrAccess =
+              extractTimeAttributeAccesses(leftTerm, leftFieldCount, inputType) match {
+                case Seq() => None
+                case Seq(a) => Some(a)
+                case _ =>
+                  // Window join predicate may only access a single time attribute on each side.
+                  return Right(pred)
+              }
+
+            // get time attribute on right side of comparison
+            val rightTimeAccess =
+              extractTimeAttributeAccesses(rightTerm, leftFieldCount, inputType) match {
+                case Seq() => None
+                case Seq(a) => Some(a)
+                case _ =>
+                  // Window join predicate may only access a single time attribute on each side.
+                  return Right(pred)
+              }
+
+            // check if both sides of the condition access a time attribute,
+            // if both join inputs are accessed, and
+            // if both accesses are on the same time type (event-time or proc-time)
+            (leftTimeAttrAccess, rightTimeAccess) match {
+              case (None, None) =>
+                // neither left or right accesses a time attribute. This is a regular join predicate
+                Right(c)
+              case (Some(_), None) | (None, Some(_)) =>
+                // Both sides or a window join predicate must access a time attribute.
+                Right(pred)
+              case (Some(left), Some(right)) if left.isEventTime != right.isEventTime =>
+                // Both time attributes in a window join predicate must be of the same type.
+                Right(pred)
+              case (Some(left), Some(right)) if left.isLeftInput == right.isLeftInput =>
+                // Window join predicates must reference the time attribute of both inputs.
+                Right(pred)
+              case (Some(left), Some(_)) =>
+                Left(TimePredicate(left.isEventTime, left.isLeftInput, c))
             }
-            (Seq((v1._1, v1._2, c)), Seq())
-          case _ =>
-            throw new TableException(
-              "Time-based join conditions must reference the time attribute of both input tables.")
+          // not a comparison predicate.
+          case _ => Right(pred)
         }
       case other =>
-        val timeIndicators = extractTimeIndicatorAccesses(other, leftFieldCount, inputType)
-        timeIndicators match {
-          case Seq() =>
-            (Seq(), Seq(other))
-          case _ =>
-            throw new TableException("Time indicators can not be used in non time-condition.")
-        }
+        Right(other)
     }
   }
 
   /**
-    * Extracts all time indicator attributes that are accessed in an expression.
+    * Extracts all time attributes that are accessed in an expression.
     *
-    * @return seq(timeType, is left input time indicator)
+    * @return A Seq of all time attribute accessed in the expression.
     */
-  def extractTimeIndicatorAccesses(
-      expression: RexNode,
+  def extractTimeAttributeAccesses(
+      expr: RexNode,
       leftFieldCount: Int,
-      inputType: RelDataType): Seq[(RelDataType, Boolean)] = {
+      inputType: RelDataType): Seq[TimeAttributeAccess] = {
 
-    expression match {
+    expr match {
       case i: RexInputRef =>
+        // check if time attribute is accessed and from which input side
         val idx = i.getIndex
         inputType.getFieldList.get(idx).getType match {
-          case t: TimeIndicatorRelDataType if idx < leftFieldCount =>
-            // left table time indicator
-            Seq((t, true))
           case t: TimeIndicatorRelDataType =>
-            // right table time indicator
-            Seq((t, false))
-          case _ => Seq()
+            // time attribute access. Remember time type and side of input
+            val isLeftInput = idx < leftFieldCount
+            Seq(TimeAttributeAccess(t.isEventTime, isLeftInput))
+          case _ =>
+            // not a time attribute access.
+            Seq()
         }
       case c: RexCall =>
+        // concat time-attributes of all operands
         c.operands.asScala
-          .map(extractTimeIndicatorAccesses(_, leftFieldCount, inputType))
+          .map(extractTimeAttributeAccesses(_, leftFieldCount, inputType))
           .reduce(_ ++ _)
       case _ => Seq()
     }
   }
 
   /**
+    * Checks if an expression accesses a time attribute.
+    *
+    * @param expr The expression to check.
+    * @param inputType The input type of the expression.
+    * @return True, if the expression accesses a time attribute. False otherwise.
+    */
+  def accessesTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
+    expr match {
+      case i: RexInputRef =>
+        val accessedType = inputType.getFieldList.get(i.getIndex).getType
+        accessedType match {
+          case _: TimeIndicatorRelDataType => true
+          case _ => false
+        }
+      case c: RexCall =>
+        c.operands.asScala.exists(accessesTimeAttribute(_, inputType))
+      case _ => false
+    }
+  }
+
+  /**
+    * Checks if an expression accesses a non-time attribute.
+    *
+    * @param expr The expression to check.
+    * @param inputType The input type of the expression.
+    * @return True, if the expression accesses a non-time attribute. False otherwise.
+    */
+  def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
+    expr match {
+      case i: RexInputRef =>
+        val accessedType = inputType.getFieldList.get(i.getIndex).getType
+        accessedType match {
+          case _: TimeIndicatorRelDataType => false
+          case _ => true
+        }
+      case c: RexCall =>
+        c.operands.asScala.exists(accessesNonTimeAttribute(_, inputType))
+      case _ => false
+    }
+  }
+
+  /**
     * Computes the absolute bound on the left operand of a comparison expression and
     * whether the bound is an upper or lower bound.
     *
     * @return window boundary, is left lower bound
     */
-  def extractTimeOffsetFromCondition(
-      timeTerm: RexNode,
-      isLeftExprBelongLeftTable: Boolean,
+  def computeWindowBoundFromPredicate(
+      timePred: TimePredicate,
       rexBuilder: RexBuilder,
-      config: TableConfig): (Long, Boolean) = {
-
-    val timeCall: RexCall = timeTerm.asInstanceOf[RexCall]
+      config: TableConfig): Option[WindowBound] = {
 
     val isLeftLowerBound: Boolean =
-      timeTerm.getKind match {
-        // e.g a.proctime > b.proctime - 5 sec, then it's the lower bound of a and the value is -5
-        // e.g b.proctime > a.proctime - 5 sec, then it's not the lower bound of a but upper bound
-        case kind@(SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL) =>
-          isLeftExprBelongLeftTable
-        // e.g a.proctime < b.proctime + 5 sec, the the upper bound of a is 5
-        case kind@(SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
-          !isLeftExprBelongLeftTable
-        case _ =>
-          throw new TableException("Unsupported time-condition.")
+      timePred.pred.getKind match {
+        case (SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL) =>
+          timePred.leftInputOnLeftSide
+        case (SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
+          !timePred.leftInputOnLeftSide
       }
 
-    val (leftLiteral, rightLiteral) =
-      reduceTimeExpression(
-        timeCall.operands.get(0),
-        timeCall.operands.get(1),
-        rexBuilder,
-        config)
-    val tmpTimeOffset: Long =
-      if (isLeftExprBelongLeftTable) rightLiteral - leftLiteral else leftLiteral - rightLiteral
-
-    val boundary =
-      tmpTimeOffset.signum * (
-        if (timeTerm.getKind == SqlKind.LESS_THAN || timeTerm.getKind == SqlKind.GREATER_THAN) {
-          tmpTimeOffset.abs - 1
-        } else {
-          tmpTimeOffset.abs
-        })
-
-    (boundary, isLeftLowerBound)
+    // reduce predicate to constants to compute bounds
+    val (leftLiteral, rightLiteral) = reduceTimeExpression(timePred, rexBuilder, config)
+
+    if (leftLiteral.isEmpty || rightLiteral.isEmpty) {
+      return None
+    }
+
+    // compute boundary
+    val tmpTimeOffset: Long = if (timePred.leftInputOnLeftSide) {
+      rightLiteral.get - leftLiteral.get
+    } else {
+      leftLiteral.get - rightLiteral.get
+    }
+    val boundary = timePred.pred.getKind match {
+      case SqlKind.LESS_THAN =>
+        tmpTimeOffset - 1
+      case SqlKind.GREATER_THAN =>
+        tmpTimeOffset + 1
+      case _ =>
+        tmpTimeOffset
+    }
+
+    Some(WindowBound(boundary, isLeftLowerBound))
   }
 
   /**
-    * Calculates the time boundary by replacing the time attribute by a zero literal
-    * and reducing the expression.
-    * For example:
-    * b.proctime - interval '1' second - interval '2' second will be translated to
-    * 0 - 1000 - 2000
+    * Replaces the time attributes on both sides of a time predicate by a zero literal and
+    * reduces the expressions on both sides to a long literal.
+    *
+    * @param timePred The time predicate which both sides are reduced.
+    * @param rexBuilder A RexBuilder
+    * @param config A TableConfig.
+    * @return The values of the reduced literals on both sides of the time comparison predicate.
     */
   private def reduceTimeExpression(
-      leftRexNode: RexNode,
-      rightRexNode: RexNode,
+      timePred: TimePredicate,
       rexBuilder: RexBuilder,
-      config: TableConfig): (Long, Long) = {
+      config: TableConfig): (Option[Long], Option[Long]) = {
 
     /**
-      * replace the rowtime/proctime with zero literal.
+      * Replace the time attribute by zero literal.
       */
     def replaceTimeFieldWithLiteral(expr: RexNode): RexNode = {
       expr match {
         case c: RexCall =>
           // replace in call operands
-          val newOps = c.operands.asScala.map(replaceTimeFieldWithLiteral(_)).asJava
+          val newOps = c.operands.asScala.map(replaceTimeFieldWithLiteral).asJava
           rexBuilder.makeCall(c.getType, c.getOperator, newOps)
         case i: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(i.getType) =>
           // replace with timestamp
           rexBuilder.makeZeroLiteral(expr.getType)
-        case _: RexInputRef =>
-          throw new TableException("Time join condition may only reference time indicator fields.")
         case _ => expr
       }
     }
 
-    val literalLeftRex = replaceTimeFieldWithLiteral(leftRexNode)
-    val literalRightRex = replaceTimeFieldWithLiteral(rightRexNode)
+    val leftSide = timePred.pred.operands.get(0)
+    val rightSide = timePred.pred.operands.get(1)
 
+    val leftSideWithLiteral = replaceTimeFieldWithLiteral(leftSide)
+    val rightSideWithLiteral = replaceTimeFieldWithLiteral(rightSide)
+
+    // reduce expression to literal
     val exprReducer = new ExpressionReducer(config)
     val originList = new util.ArrayList[RexNode]()
-    originList.add(literalLeftRex)
-    originList.add(literalRightRex)
+    originList.add(leftSideWithLiteral)
+    originList.add(rightSideWithLiteral)
     val reduceList = new util.ArrayList[RexNode]()
     exprReducer.reduce(rexBuilder, originList, reduceList)
 
-    val literals = reduceList.asScala.map(f => f match {
+    // extract bounds from reduced literal
+    val literals = reduceList.asScala.map {
       case literal: RexLiteral =>
-        literal.getValue2.asInstanceOf[Long]
+        Some(literal.getValue2.asInstanceOf[Long])
       case _ =>
-        throw TableException(
-          "Time condition may only consist of time attributes, literals, and arithmetic operators.")
-    })
+        None
+    }
 
-    (literals(0), literals(1))
+    (literals.head, literals(1))
   }
 
-
   /**
-    * Generate other non-equi condition function
+    * Generates a JoinFunction that applies additional join predicates and projects the result.
     *
     * @param  config          table env config
     * @param  joinType        join type to determain whether input can be null
@@ -300,7 +398,7 @@ object WindowJoinUtil {
       rightType: TypeInformation[Row],
       returnType: RowSchema,
       otherCondition: Option[RexNode],
-      ruleDescription: String) = {
+      ruleDescription: String): GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row] = {
 
     // whether input can be null
     val nullCheck = joinType match {
@@ -311,7 +409,7 @@ object WindowJoinUtil {
     }
 
     // generate other non-equi function code
-    val generator = new CodeGenerator(
+    val generator = new FunctionCodeGenerator(
       config,
       nullCheck,
       leftType,
@@ -329,7 +427,9 @@ object WindowJoinUtil {
            |${generator.collectorTerm}.collect(${conversion.resultTerm});
            |""".stripMargin
       case Some(remainCondition) =>
-        val genCond = generator.generateExpression(remainCondition)
+        // map logical field accesses to physical accesses
+        val physicalCondition = returnType.mapRexNode(remainCondition)
+        val genCond = generator.generateExpression(physicalCondition)
         s"""
            |${genCond.code}
            |if (${genCond.resultTerm}) {

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
deleted file mode 100644
index 769ba55..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.updateutils
-
-import org.apache.calcite.rel.{RelNode, RelVisitor}
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SqlKind
-import org.apache.flink.table.plan.nodes.datastream._
-import _root_.scala.collection.JavaConverters._
-
-object UpdateCheckUtils {
-
-  /** Validates that the plan produces only append changes. */
-  def isAppendOnly(plan: RelNode): Boolean = {
-    val appendOnlyValidator = new AppendOnlyValidator
-    appendOnlyValidator.go(plan)
-
-    appendOnlyValidator.isAppendOnly
-  }
-
-  /** Extracts the unique keys of the table produced by the plan. */
-  def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
-    val keyExtractor = new UniqueKeyExtractor
-    keyExtractor.go(plan)
-    keyExtractor.keys
-  }
-
-  private class AppendOnlyValidator extends RelVisitor {
-
-    var isAppendOnly = true
-
-    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
-      node match {
-        case s: DataStreamRel if s.producesUpdates =>
-          isAppendOnly = false
-        case _ =>
-          super.visit(node, ordinal, parent)
-      }
-    }
-  }
-
-  /** Identifies unique key fields in the output of a RelNode. */
-  private class UniqueKeyExtractor extends RelVisitor {
-
-    var keys: Option[Array[String]] = None
-
-    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
-      node match {
-        case c: DataStreamCalc =>
-          super.visit(node, ordinal, parent)
-          // check if input has keys
-          if (keys.isDefined) {
-            // track keys forward
-            val inNames = c.getInput.getRowType.getFieldNames
-            val inOutNames = c.getProgram.getNamedProjects.asScala
-              .map(p => {
-                c.getProgram.expandLocalRef(p.left) match {
-                  // output field is forwarded input field
-                  case i: RexInputRef => (i.getIndex, p.right)
-                  // output field is renamed input field
-                  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
-                    a.getOperands.get(0) match {
-                      case ref: RexInputRef =>
-                        (ref.getIndex, p.right)
-                      case _ =>
-                        (-1, p.right)
-                    }
-                  // output field is not forwarded from input
-                  case _: RexNode => (-1, p.right)
-                }
-              })
-              // filter all non-forwarded fields
-              .filter(_._1 >= 0)
-              // resolve names of input fields
-              .map(io => (inNames.get(io._1), io._2))
-
-            // filter by input keys
-            val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
-            // check if all keys have been preserved
-            if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
-              // all key have been preserved (but possibly renamed)
-              keys = Some(outKeys.toArray)
-            } else {
-              // some (or all) keys have been removed. Keys are no longer unique and removed
-              keys = None
-            }
-          }
-        case _: DataStreamOverAggregate =>
-          super.visit(node, ordinal, parent)
-        // keys are always forwarded by Over aggregate
-        case a: DataStreamGroupAggregate =>
-          // get grouping keys
-          val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
-          keys = Some(groupKeys.toArray)
-        case w: DataStreamGroupWindowAggregate =>
-          // get grouping keys
-          val groupKeys =
-            w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
-          // get window start and end time
-          val windowStartEnd = w.getWindowProperties.map(_.name)
-          // we have only a unique key if at least one window property is selected
-          if (windowStartEnd.nonEmpty) {
-            keys = Some(groupKeys ++ windowStartEnd)
-          }
-        case _: DataStreamRel =>
-          // anything else does not forward keys or might duplicate key, so we can stop
-          keys = None
-      }
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
deleted file mode 100644
index d4ff3f7..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.types.Row
-import org.junit._
-
-import scala.collection.mutable
-
-class JoinITCase extends StreamingWithStateTestBase {
-
-  val data = List(
-    (1L, 1, "Hello"),
-    (2L, 2, "Hello"),
-    (3L, 3, "Hello"),
-    (4L, 4, "Hello"),
-    (5L, 5, "Hello"),
-    (6L, 6, "Hello"),
-    (7L, 7, "Hello World"),
-    (8L, 8, "Hello World"),
-    (20L, 20, "Hello World"))
-
-  /** test process time inner join **/
-  @Test
-  def testProcessTimeInnerJoin(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
-      "t1.proctime between t2.proctime - interval '5' second and t2.proctime + interval '5' second"
-
-    val data1 = new mutable.MutableList[(Int, Long, String)]
-    data1.+=((1, 1L, "Hi1"))
-    data1.+=((1, 2L, "Hi2"))
-    data1.+=((1, 5L, "Hi3"))
-    data1.+=((2, 7L, "Hi5"))
-    data1.+=((1, 9L, "Hi6"))
-    data1.+=((1, 8L, "Hi8"))
-
-    val data2 = new mutable.MutableList[(Int, Long, String)]
-    data2.+=((1, 1L, "HiHi"))
-    data2.+=((2, 2L, "HeHe"))
-
-    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-    tEnv.registerTable("T2", t2)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-  }
-
-  /** test process time inner join with other condition **/
-  @Test
-  def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
-      "t1.proctime between t2.proctime - interval '5' second " +
-      "and t2.proctime + interval '5' second " +
-      "and t1.b > t2.b and t1.b + t2.b < 14"
-
-    val data1 = new mutable.MutableList[(String, Long, String)]
-    data1.+=(("1", 1L, "Hi1"))
-    data1.+=(("1", 2L, "Hi2"))
-    data1.+=(("1", 5L, "Hi3"))
-    data1.+=(("2", 7L, "Hi5"))
-    data1.+=(("1", 9L, "Hi6"))
-    data1.+=(("1", 8L, "Hi8"))
-
-    val data2 = new mutable.MutableList[(String, Long, String)]
-    data2.+=(("1", 5L, "HiHi"))
-    data2.+=(("2", 2L, "HeHe"))
-
-    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-    tEnv.registerTable("T2", t2)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-  }
-
-}
-