You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:44 UTC

[flink] 11/11: [FLINK-9714][table] Support versioned joins with processing time

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 00add9cdc02ccfacb38a566099d877e4669c6f65
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Sep 20 14:15:48 2018 +0200

    [FLINK-9714][table] Support versioned joins with processing time
---
 .../logical/rel/LogicalTemporalTableJoin.scala     |  42 +-
 .../apache/flink/table/plan/nodes/CommonJoin.scala |   9 +
 ...taStreamTemporalJoinToCoProcessTranslator.scala | 237 +++++++++++
 .../datastream/DataStreamTemporalTableJoin.scala   |  29 +-
 .../flink/table/runtime/join/TemporalJoin.scala    |  93 +++++
 .../TemporalTableJoinValidationTest.scala          |  72 +++-
 .../table/runtime/harness/HarnessTestBase.scala    |   7 +
 .../runtime/harness/TemporalJoinHarnessTest.scala  | 452 +++++++++++++++++++++
 .../runtime/stream/sql/TemporalJoinITCase.scala    | 135 ++++++
 .../runtime/utils/StreamingWithStateTestBase.scala |   7 +-
 10 files changed, 1068 insertions(+), 15 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalTemporalTableJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalTemporalTableJoin.scala
index 3b1d51b..7fdc5e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalTemporalTableJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalTemporalTableJoin.scala
@@ -23,7 +23,7 @@ import java.util.Collections
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core._
-import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexNode}
 import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
 import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
 import org.apache.flink.util.Preconditions.checkArgument
@@ -101,6 +101,38 @@ object LogicalTemporalTableJoin {
         OperandTypes.ANY)),
     SqlFunctionCategory.SYSTEM)
 
+  def isRowtimeCall(call: RexCall): Boolean = {
+    checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+    call.getOperands.size() == 3
+  }
+
+  def isProctimeCall(call: RexCall): Boolean = {
+    checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+    call.getOperands.size() == 2
+  }
+
+  def makeRowTimeTemporalJoinConditionCall(
+      rexBuilder: RexBuilder,
+      leftTimeAttribute: RexNode,
+      rightTimeAttribute: RexNode,
+      rightPrimaryKeyExpression: RexNode): RexNode = {
+    rexBuilder.makeCall(
+      TEMPORAL_JOIN_CONDITION,
+      leftTimeAttribute,
+      rightTimeAttribute,
+      rightPrimaryKeyExpression)
+  }
+
+  def makeProcTimeTemporalJoinConditionCall(
+      rexBuilder: RexBuilder,
+      leftTimeAttribute: RexNode,
+      rightPrimaryKeyExpression: RexNode): RexNode = {
+    rexBuilder.makeCall(
+      TEMPORAL_JOIN_CONDITION,
+      leftTimeAttribute,
+      rightPrimaryKeyExpression)
+  }
+
   /**
     * See [[LogicalTemporalTableJoin]]
     */
@@ -119,8 +151,8 @@ object LogicalTemporalTableJoin {
       traitSet,
       left,
       right,
-      rexBuilder.makeCall(
-        TEMPORAL_JOIN_CONDITION,
+      makeRowTimeTemporalJoinConditionCall(
+        rexBuilder,
         leftTimeAttribute,
         rightTimeAttribute,
         rightPrimaryKeyExpression))
@@ -148,8 +180,8 @@ object LogicalTemporalTableJoin {
       traitSet,
       left,
       right,
-      rexBuilder.makeCall(
-        TEMPORAL_JOIN_CONDITION,
+      makeProcTimeTemporalJoinConditionCall(
+        rexBuilder,
         leftTimeAttribute,
         rightPrimaryKeyExpression))
   }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
index 7d0ca35..3d98a4d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
@@ -48,6 +48,15 @@ trait CommonJoin {
     }
   }
 
+  private[flink] def temporalJoinToString(
+      inputType: RelDataType,
+      joinCondition: RexNode,
+      joinType: JoinRelType,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+    "Temporal" + joinToString(inputType, joinCondition, joinType, expression)
+  }
+
   private[flink] def joinToString(
       inputType: RelDataType,
       joinCondition: RexNode,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
new file mode 100644
index 0000000..467e84f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rex._
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, TableException, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory._
+import org.apache.flink.table.codegen.GeneratedFunction
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin._
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.runtime.join.TemporalJoin
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+class DataStreamTemporalJoinToCoProcessTranslator private (
+    textualRepresentation: String,
+    config: TableConfig,
+    returnType: TypeInformation[Row],
+    leftSchema: RowSchema,
+    rightSchema: RowSchema,
+    joinInfo: JoinInfo,
+    rexBuilder: RexBuilder,
+    leftTimeAttribute: RexNode,
+    rightTimeAttribute: Option[RexNode],
+    rightPrimaryKeyExpression: RexNode,
+    remainingNonEquiJoinPredicates: RexNode)
+  extends DataStreamJoinToCoProcessTranslator(
+    config,
+    returnType,
+    leftSchema,
+    rightSchema,
+    joinInfo,
+    rexBuilder) {
+
+  override val nonEquiJoinPredicates: Option[RexNode] = Some(remainingNonEquiJoinPredicates)
+
+  override protected def createCoProcessFunction(
+      joinType: JoinRelType,
+      queryConfig: StreamQueryConfig,
+      joinFunction: GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row])
+    : CoProcessFunction[CRow, CRow, CRow] = {
+
+    if (rightTimeAttribute.isDefined) {
+      throw new ValidationException(
+        s"Currently only proctime temporal joins are supported in [$textualRepresentation]")
+    }
+
+    joinType match {
+      case JoinRelType.INNER =>
+        new TemporalJoin(
+          leftSchema.typeInfo,
+          rightSchema.typeInfo,
+          joinFunction.name,
+          joinFunction.code,
+          queryConfig)
+      case _ =>
+       throw new ValidationException(
+         s"Only ${JoinRelType.INNER} temporal join is supported in [$textualRepresentation]")
+    }
+  }
+}
+
+object DataStreamTemporalJoinToCoProcessTranslator {
+  def create(
+    textualRepresentation: String,
+    config: TableConfig,
+    returnType: TypeInformation[Row],
+    leftSchema: RowSchema,
+    rightSchema: RowSchema,
+    joinInfo: JoinInfo,
+    rexBuilder: RexBuilder): DataStreamTemporalJoinToCoProcessTranslator = {
+
+    checkState(
+      !joinInfo.isEqui,
+      "Missing %s in join condition",
+      TEMPORAL_JOIN_CONDITION)
+
+    val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
+    val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
+      textualRepresentation,
+      leftSchema.typeInfo.getTotalFields,
+      joinInfo,
+      rexBuilder)
+
+    val remainingNonEquiJoinPredicates = temporalJoinConditionExtractor.apply(nonEquiJoinRex)
+
+    checkState(
+      temporalJoinConditionExtractor.leftTimeAttribute.isDefined &&
+        temporalJoinConditionExtractor.rightPrimaryKeyExpression.isDefined,
+      "Missing %s in join condition",
+      TEMPORAL_JOIN_CONDITION)
+
+    new DataStreamTemporalJoinToCoProcessTranslator(
+      textualRepresentation,
+      config,
+      returnType,
+      leftSchema,
+      rightSchema,
+      joinInfo,
+      rexBuilder,
+      temporalJoinConditionExtractor.leftTimeAttribute.get,
+      temporalJoinConditionExtractor.rightTimeAttribute,
+      temporalJoinConditionExtractor.rightPrimaryKeyExpression.get,
+      remainingNonEquiJoinPredicates)
+  }
+
+  private class TemporalJoinConditionExtractor(
+      textualRepresentation: String,
+      rightKeysStartingOffset: Int,
+      joinInfo: JoinInfo,
+      rexBuilder: RexBuilder)
+
+    extends RexShuttle {
+
+    var leftTimeAttribute: Option[RexNode] = None
+
+    var rightTimeAttribute: Option[RexNode] = None
+
+    var rightPrimaryKeyExpression: Option[RexNode] = None
+
+    override def visitCall(call: RexCall): RexNode = {
+      if (call.getOperator != TEMPORAL_JOIN_CONDITION) {
+        return super.visitCall(call)
+      }
+
+      checkState(
+        leftTimeAttribute.isEmpty
+        && rightPrimaryKeyExpression.isEmpty
+        && rightTimeAttribute.isEmpty,
+        "Multiple %s functions in [%s]",
+        TEMPORAL_JOIN_CONDITION,
+        textualRepresentation)
+
+      if (LogicalTemporalTableJoin.isRowtimeCall(call)) {
+        leftTimeAttribute = Some(call.getOperands.get(0))
+        rightTimeAttribute = Some(call.getOperands.get(1))
+
+        rightPrimaryKeyExpression = Some(validateRightPrimaryKey(call.getOperands.get(2)))
+
+        if (!isRowtimeIndicatorType(rightTimeAttribute.get.getType)) {
+          throw new ValidationException(
+            s"Non rowtime timeAttribute [${rightTimeAttribute.get.getType}] " +
+              s"used to create TemporalTableFunction")
+        }
+        if (!isRowtimeIndicatorType(leftTimeAttribute.get.getType)) {
+          throw new ValidationException(
+            s"Non rowtime timeAttribute [${leftTimeAttribute.get.getType}] " +
+              s"passed as the argument to TemporalTableFunction")
+        }
+
+        throw new TableException("Event time temporal joins are not yet supported.")
+      }
+      else if (LogicalTemporalTableJoin.isProctimeCall(call)) {
+        leftTimeAttribute = Some(call.getOperands.get(0))
+        rightPrimaryKeyExpression = Some(validateRightPrimaryKey(call.getOperands.get(1)))
+
+        if (!isProctimeIndicatorType(leftTimeAttribute.get.getType)) {
+          throw new ValidationException(
+            s"Non processing timeAttribute [${leftTimeAttribute.get.getType}] " +
+              s"passed as the argument to TemporalTableFunction")
+        }
+      }
+      else {
+        throw new IllegalStateException(
+          s"Unsupported invocation $call in [$textualRepresentation]")
+      }
+      rexBuilder.makeLiteral(true)
+    }
+
+    private def validateRightPrimaryKey(rightPrimaryKey: RexNode): RexNode = {
+      if (joinInfo.rightKeys.size() != 1) {
+        throw new ValidationException(
+          s"Only single column join key is supported. " +
+            s"Found ${joinInfo.rightKeys} in [$textualRepresentation]")
+      }
+      val rightKey = joinInfo.rightKeys.get(0) + rightKeysStartingOffset
+
+      val primaryKeyVisitor = new PrimaryKeyVisitor(textualRepresentation)
+      rightPrimaryKey.accept(primaryKeyVisitor)
+
+      primaryKeyVisitor.inputReference match {
+        case None =>
+          throw new IllegalStateException(
+            s"Failed to find primary key reference in [$textualRepresentation]")
+        case Some(primaryKeyInputReference) if primaryKeyInputReference != rightKey =>
+          throw new ValidationException(
+            s"Join key [$rightKey] must be the same as " +
+              s"temporal table's primary key [$primaryKeyInputReference] " +
+              s"in [$textualRepresentation]")
+        case _ =>
+          rightPrimaryKey
+      }
+    }
+  }
+
+  /**
+    * Extracts input references from primary key expression.
+    */
+  private class PrimaryKeyVisitor(textualRepresentation: String)
+    extends RexDefaultVisitor[RexNode] {
+
+    var inputReference: Option[Int] = None
+
+    override def visitInputRef(inputRef: RexInputRef): RexNode = {
+      inputReference = Some(inputRef.getIndex)
+      inputRef
+    }
+
+    override def visitNode(rexNode: RexNode): RexNode = {
+      throw new ValidationException(
+        s"Unsupported right primary key expression [$rexNode] in [$textualRepresentation]")
+    }
+  }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalTableJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalTableJoin.scala
index 60f36d3..d0c2e96 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalTableJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalTableJoin.scala
@@ -22,12 +22,20 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
 import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
+import org.apache.flink.table.codegen.FunctionCodeGenerator
 import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.runtime.CRowKeySelector
+import org.apache.flink.table.runtime.join._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
 import org.apache.flink.util.Preconditions.checkState
 
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
 /**
   * RelNode for a stream join with [[org.apache.flink.table.functions.TemporalTableFunction]].
   */
@@ -74,9 +82,14 @@ class DataStreamTemporalTableJoin(
       ruleDescription)
   }
 
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      queryConfig: StreamQueryConfig): DataStream[CRow] = {
-    throw new NotImplementedError()
-  }
-}
+  override protected def createTranslator(
+      tableEnv: StreamTableEnvironment): DataStreamJoinToCoProcessTranslator = {
+    DataStreamTemporalJoinToCoProcessTranslator.create(
+      this.toString,
+      tableEnv.getConfig,
+      schema.typeInfo,
+      leftSchema,
+      rightSchema,
+      joinInfo,
+      cluster.getRexBuilder)
+  }}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalJoin.scala
new file mode 100644
index 0000000..5087515
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalJoin.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+class TemporalJoin(
+    leftType: TypeInformation[Row],
+    rightType: TypeInformation[Row],
+    genJoinFuncName: String,
+    genJoinFuncCode: String,
+    queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  protected var rightState: ValueState[Row] = _
+  protected var cRowWrapper: CRowWrappingCollector = _
+
+  protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  override def open(parameters: Configuration): Unit = {
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genJoinFuncName,
+      genJoinFuncCode)
+
+    joinFunction = clazz.newInstance()
+
+    val rightStateDescriptor = new ValueStateDescriptor[Row]("right", rightType)
+    rightState = getRuntimeContext.getState(rightStateDescriptor)
+
+    cRowWrapper = new CRowWrappingCollector()
+  }
+
+  override def processElement1(
+      value: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    if (rightState.value() == null) {
+      return
+    }
+
+    cRowWrapper.out = out
+    cRowWrapper.setChange(value.change)
+
+    val rightSideRow = rightState.value()
+    joinFunction.join(value.row, rightSideRow, cRowWrapper)
+  }
+
+  override def processElement2(
+      value: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    if (value.change) {
+      rightState.update(value.row)
+    } else {
+      rightState.clear()
+    }
+  }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
index ab282ec..58ad493 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.api.stream.table.validation
 import java.sql.Timestamp
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.utils._
 import org.junit.Test
@@ -33,9 +33,18 @@ class TemporalTableJoinValidationTest extends TableTestBase {
   val orders = util.addTable[(Long, String, Timestamp)](
     "Orders", 'o_amount, 'o_currency, 'o_rowtime.rowtime)
 
+  val ordersProctime = util.addTable[(Long, String)](
+    "OrdersProctime", 'o_amount, 'o_currency, 'o_rowtime.proctime)
+
+  val ordersWithoutTimeAttribute = util.addTable[(Long, String, Timestamp)](
+    "OrdersWithoutTimeAttribute", 'o_amount, 'o_currency, 'o_rowtime)
+
   val ratesHistory = util.addTable[(String, Int, Timestamp)](
     "RatesHistory", 'currency, 'rate, 'rowtime.rowtime)
 
+  val ratesHistoryWithoutTimeAttribute = util.addTable[(String, Int, Timestamp)](
+    "ratesHistoryWithoutTimeAttribute", 'currency, 'rate, 'rowtime)
+
   @Test
   def testInvalidFieldReference(): Unit = {
     expectedException.expect(classOf[ValidationException])
@@ -51,4 +60,65 @@ class TemporalTableJoinValidationTest extends TableTestBase {
 
     ratesHistory.createTemporalTableFunction("rowtime", "foobar")
   }
+
+  @Test
+  def testNonTimeIndicatorOnRightSide(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(
+      "Non rowtime timeAttribute [TIMESTAMP(3)] used to create TemporalTableFunction")
+
+    val rates = ratesHistoryWithoutTimeAttribute.createTemporalTableFunction('rowtime, 'currency)
+
+    val result = orders
+      .join(rates('o_rowtime), "currency = o_currency")
+      .select("o_amount * rate").as("rate")
+
+    util.explain(result)
+  }
+
+  @Test
+  def testNonTimeIndicatorOnLeftSide(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(
+      "Non rowtime timeAttribute [TIMESTAMP(3)] passed as the argument to TemporalTableFunction")
+
+    val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency)
+
+    val result = ordersWithoutTimeAttribute
+      .join(rates('o_rowtime), "currency = o_currency")
+      .select("o_amount * rate").as("rate")
+
+    util.explain(result)
+  }
+
+  @Test
+  def testMixedTimeIndicators(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(
+      "Non rowtime timeAttribute [TIME ATTRIBUTE(PROCTIME)] passed as the argument " +
+        "to TemporalTableFunction")
+
+    val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency)
+
+    val result = ordersProctime
+      .join(rates('o_rowtime), "currency = o_currency")
+      .select("o_amount * rate").as("rate")
+
+    util.explain(result)
+  }
+
+  @Test
+  def testEventTimeIndicators(): Unit = {
+    expectedException.expect(classOf[TableException])
+    expectedException.expectMessage(
+      "Event time temporal joins are not yet supported.")
+
+    val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency)
+
+    val result = orders
+      .join(rates('o_rowtime), "currency = o_currency")
+      .select("o_amount * rate").as("rate")
+
+    util.explain(result)
+  }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index d494c21..28b7d14 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -36,8 +36,15 @@ import org.apache.flink.table.functions.aggfunctions.{IntSumWithRetractAggFuncti
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.getAccumulatorTypeOfAggregateFunction
 import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, RowResultSortComparatorWithWatermarks}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.junit.Rule
+import org.junit.rules.ExpectedException
 
 class HarnessTestBase {
+  // used for accurate exception information checking.
+  val expectedException = ExpectedException.none()
+
+  @Rule
+  def thrown = expectedException
 
   val longMinWithRetractAggFunction: String =
     UserDefinedFunctionUtils.serialize(new LongMinWithRetractAggFunction)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
new file mode 100644
index 0000000..43f3548
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.util.ImmutableIntList
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.{TableConfig, Types, ValidationException}
+import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin.TEMPORAL_JOIN_CONDITION
+import org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CRowKeySelector
+import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, TestStreamQueryConfig}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import org.hamcrest.Matchers.startsWith
+import org.junit.Test
+
+class TemporalJoinHarnessTest extends HarnessTestBase {
+
+  private val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
+
+  private val tableConfig = new TableConfig
+
+  private val queryConfig =
+    new TestStreamQueryConfig(Time.milliseconds(2), Time.milliseconds(4))
+
+  private val ORDERS_KEY = "o_currency"
+
+  private val ORDERS_PROCTIME = "o_proctime"
+
+  private val RATES_KEY = "r_currency"
+
+  private val ordersRowtimeType = new RowTypeInfo(
+    Array[TypeInformation[_]](
+      Types.LONG,
+      Types.STRING,
+      TimeIndicatorTypeInfo.ROWTIME_INDICATOR),
+    Array("o_amount", ORDERS_KEY, "o_rowtime"))
+
+  private val ordersProctimeType = new RowTypeInfo(
+    Array[TypeInformation[_]](
+      Types.LONG,
+      Types.STRING,
+      TimeIndicatorTypeInfo.PROCTIME_INDICATOR),
+    Array("o_amount", ORDERS_KEY, ORDERS_PROCTIME))
+
+  private val ratesRowtimeType = new RowTypeInfo(
+    Array[TypeInformation[_]](
+      Types.STRING,
+      Types.LONG,
+      TimeIndicatorTypeInfo.ROWTIME_INDICATOR),
+    Array(RATES_KEY, "r_rate", "r_rowtime"))
+
+  private val ratesProctimeType = new RowTypeInfo(
+    Array[TypeInformation[_]](
+      Types.STRING,
+      Types.LONG,
+      TimeIndicatorTypeInfo.PROCTIME_INDICATOR),
+    Array(RATES_KEY, "r_rate", "r_proctime"))
+
+  private val joinRowtimeType = new RowTypeInfo(
+    ordersRowtimeType.getFieldTypes ++ ratesRowtimeType.getFieldTypes,
+    ordersRowtimeType.getFieldNames ++ ratesRowtimeType.getFieldNames)
+
+  private val rexBuilder = new RexBuilder(typeFactory)
+
+  @Test
+  def testProctime() {
+    val testHarness = createTestHarness(new OrdersRatesProctimeTemporalJoinInfo)
+
+    testHarness.open()
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // process without conversion rates
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", null)))
+
+    // initiate conversion rates
+    testHarness.processElement2(new StreamRecord(CRow("US Dollar", 102L, null)))
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, null)))
+    testHarness.processElement2(new StreamRecord(CRow("Yen", 1L, null)))
+
+    // process with conversion rates
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", null)))
+    testHarness.processElement1(new StreamRecord(CRow(1L, "US Dollar", null)))
+    testHarness.processElement1(new StreamRecord(CRow(50L, "Yen", null)))
+
+    expectedOutput.add(new StreamRecord(CRow(2L, "Euro", null, "Euro", 114L, null)))
+    expectedOutput.add(new StreamRecord(CRow(1L, "US Dollar", null, "US Dollar", 102L, null)))
+    expectedOutput.add(new StreamRecord(CRow(50L, "Yen", null, "Yen", 1L, null)))
+
+    // update Euro
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 116L, null)))
+
+    // process Euro
+    testHarness.processElement1(new StreamRecord(CRow(3L, "Euro", null)))
+
+    expectedOutput.add(new StreamRecord(CRow(3L, "Euro", null, "Euro", 116L, null)))
+
+    // again update Euro
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 119L, null)))
+
+    // process US Dollar
+    testHarness.processElement1(new StreamRecord(CRow(5L, "US Dollar", null)))
+
+    expectedOutput.add(new StreamRecord(CRow(5L, "US Dollar", null, "US Dollar", 102L, null)))
+
+    verify(expectedOutput, testHarness.getOutput, new RowResultSortComparator())
+
+    testHarness.close()
+  }
+
+  @Test
+  def testNonEquiProctime() {
+    val testHarness = createTestHarness(
+      new ProctimeTemporalJoinInfo(
+        new RowTypeInfo(
+          ordersProctimeType.getFieldTypes :+ Types.INT,
+          ordersProctimeType.getFieldNames :+ "foo"),
+        new RowTypeInfo(
+          ratesProctimeType.getFieldTypes :+ Types.INT,
+          ratesProctimeType.getFieldNames :+ "bar"),
+        ORDERS_KEY,
+        RATES_KEY,
+        ORDERS_PROCTIME) {
+        /**
+          * @return [[LogicalTemporalTableJoin.TEMPORAL_JOIN_CONDITION]](...) AND
+          *        leftInputRef(3) > rightInputRef(3)
+          */
+        override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+          rexBuilder.makeCall(
+            SqlStdOperatorTable.AND,
+            super.getRemaining(rexBuilder),
+            rexBuilder.makeCall(
+              SqlStdOperatorTable.GREATER_THAN,
+              makeLeftInputRef("foo"),
+              makeRightInputRef("bar")))
+        }
+      })
+
+    testHarness.open()
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // initiate conversion rates
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, null, 42)))
+    testHarness.processElement2(new StreamRecord(CRow("Yen", 1L, null, 42)))
+
+    // process with conversion rates
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", null, 0)))
+    testHarness.processElement1(new StreamRecord(CRow(50L, "Yen", null, 44)))
+
+    expectedOutput.add(new StreamRecord(CRow(50L, "Yen", null, 44, "Yen", 1L, null, 42)))
+
+    // update Euro
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 116L, null, 44)))
+
+    // process Euro
+    testHarness.processElement1(new StreamRecord(CRow(3L, "Euro", null, 42)))
+    testHarness.processElement1(new StreamRecord(CRow(4L, "Euro", null, 44)))
+    testHarness.processElement1(new StreamRecord(CRow(5L, "Euro", null, 1337)))
+
+    expectedOutput.add(new StreamRecord(CRow(5L, "Euro", null, 1337, "Euro", 116L, null, 44)))
+
+    // process US Dollar
+    testHarness.processElement1(new StreamRecord(CRow(5L, "US Dollar", null, 1337)))
+
+    verify(expectedOutput, testHarness.getOutput, new RowResultSortComparator())
+
+    testHarness.close()
+  }
+
+  @Test
+  def testMissingTemporalJoinCondition() {
+    expectedException.expect(classOf[IllegalStateException])
+    expectedException.expectMessage(startsWith(s"Missing ${TEMPORAL_JOIN_CONDITION.getName}"))
+
+    translateJoin(new TemporalJoinInfo(
+      ordersProctimeType,
+      ratesProctimeType,
+      ORDERS_KEY,
+      RATES_KEY) {
+
+      override def isEqui: Boolean = true
+
+      override def getRemaining(rexBuilder: RexBuilder): RexNode = rexBuilder.makeLiteral(true)
+    })
+  }
+
+  @Test
+  def testNonEquiMissingTemporalJoinCondition() {
+    expectedException.expect(classOf[IllegalStateException])
+    expectedException.expectMessage(startsWith(s"Missing ${TEMPORAL_JOIN_CONDITION.getName}"))
+
+    translateJoin(new TemporalJoinInfo(
+      ordersProctimeType,
+      ratesProctimeType,
+      ORDERS_KEY,
+      RATES_KEY) {
+
+      override def isEqui: Boolean = true
+
+      override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+        rexBuilder.makeCall(
+          SqlStdOperatorTable.GREATER_THAN,
+          rexBuilder.makeCall(
+            SqlStdOperatorTable.CONCAT,
+            rexBuilder.makeLiteral("A"),
+            makeLeftInputRef(ORDERS_KEY)),
+          makeRightInputRef(RATES_KEY))
+      }
+    })
+  }
+
+  @Test
+  def testTwoTemporalJoinConditions() {
+    expectedException.expect(classOf[IllegalStateException])
+    expectedException.expectMessage(startsWith(s"Multiple $TEMPORAL_JOIN_CONDITION functions"))
+
+    translateJoin(
+      new OrdersRatesProctimeTemporalJoinInfo() {
+        override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+          rexBuilder.makeCall(
+            SqlStdOperatorTable.OR,
+            super.getRemaining(rexBuilder),
+            super.getRemaining(rexBuilder))
+        }
+      })
+  }
+
+  @Test
+  def testIncorrectTemporalJoinCondition() {
+    expectedException.expect(classOf[IllegalStateException])
+    expectedException.expectMessage(startsWith(s"Unsupported invocation"))
+
+    translateJoin(
+      new OrdersRatesProctimeTemporalJoinInfo() {
+        override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+          rexBuilder.makeCall(
+            TEMPORAL_JOIN_CONDITION,
+            makeLeftInputRef(leftKey),
+            makeLeftInputRef(leftKey),
+            makeLeftInputRef(leftKey),
+            makeRightInputRef(rightKey))
+        }
+      })
+  }
+
+  @Test
+  def testUnsupportedPrimaryKeyInTemporalJoinCondition() {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(startsWith("Unsupported right primary key expression"))
+
+    translateJoin(
+      new OrdersRatesProctimeTemporalJoinInfo() {
+        override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+          LogicalTemporalTableJoin.makeProcTimeTemporalJoinConditionCall(
+            rexBuilder,
+            makeLeftInputRef(leftTimeAttribute),
+            rexBuilder.makeCall(
+              SqlStdOperatorTable.CONCAT,
+              rexBuilder.makeLiteral("A"),
+              makeRightInputRef(RATES_KEY)))
+        }
+      })
+  }
+
+  @Test
+  def testMultipleJoinKeys() {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(startsWith(s"Only single column join key"))
+
+    translateJoin(
+      new TemporalJoinInfo(
+        ordersProctimeType,
+        ratesProctimeType,
+        ImmutableIntList.of(0, 1),
+        ImmutableIntList.of(1, 0)) {
+
+        override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+          LogicalTemporalTableJoin.makeProcTimeTemporalJoinConditionCall(
+            rexBuilder,
+            makeLeftInputRef(ORDERS_PROCTIME),
+            makeRightInputRef(RATES_KEY))
+        }
+      })
+  }
+
+  @Test
+  def testNonInnerJoin() {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(startsWith(s"Only ${JoinRelType.INNER} temporal join"))
+
+    translateJoin(new OrdersRatesProctimeTemporalJoinInfo, JoinRelType.FULL)
+  }
+
+  def createTestHarness(temporalJoinInfo: TemporalJoinInfo)
+    : KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow] = {
+
+    val (leftKeySelector, rightKeySelector, joinCoProcessFunction) =
+      translateJoin(temporalJoinInfo)
+
+    val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] =
+      new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinCoProcessFunction)
+
+    new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow](
+      operator,
+      leftKeySelector.asInstanceOf[KeySelector[CRow, String]],
+      rightKeySelector.asInstanceOf[KeySelector[CRow, String]],
+      BasicTypeInfo.STRING_TYPE_INFO,
+      1,
+      1,
+      0)
+  }
+
+  def translateJoin(joinInfo: TemporalJoinInfo, joinRelType: JoinRelType = JoinRelType.INNER)
+    : (CRowKeySelector, CRowKeySelector, CoProcessFunction[CRow, CRow, CRow]) = {
+
+    val leftType = joinInfo.leftRowType
+    val rightType = joinInfo.rightRowType
+    val joinType = new RowTypeInfo(
+      leftType.getFieldTypes ++ rightType.getFieldTypes,
+      leftType.getFieldNames ++ rightType.getFieldNames)
+
+    val joinTranslator = DataStreamTemporalJoinToCoProcessTranslator.create(
+      "TemporalJoin",
+      tableConfig,
+      joinType,
+      new RowSchema(typeFactory.createTypeFromTypeInfo(leftType, false)),
+      new RowSchema(typeFactory.createTypeFromTypeInfo(rightType, false)),
+      joinInfo,
+      rexBuilder)
+
+    val joinCoProcessFunction = joinTranslator.getCoProcessFunction(
+      joinRelType,
+      joinType.getFieldNames,
+      "TemporalJoin",
+      queryConfig)
+
+    (joinTranslator.getLeftKeySelector(),
+      joinTranslator.getRightKeySelector(),
+      joinCoProcessFunction)
+  }
+
+  abstract class TemporalJoinInfo(
+      val leftRowType: RowTypeInfo,
+      val rightRowType: RowTypeInfo,
+      leftKeys: ImmutableIntList,
+      rightKeys: ImmutableIntList)
+    extends JoinInfo(leftKeys, rightKeys) {
+
+    def this(
+      leftRowType: RowTypeInfo,
+      rightRowType: RowTypeInfo,
+      leftKey: String,
+      rightKey: String) =
+      this(
+        leftRowType,
+        rightRowType,
+        ImmutableIntList.of(leftRowType.getFieldIndex(leftKey)),
+        ImmutableIntList.of(rightRowType.getFieldIndex(rightKey)))
+
+    override def isEqui: Boolean = false
+
+    def makeLeftInputRef(leftField: String): RexNode = {
+      rexBuilder.makeInputRef(
+        typeFactory.createTypeFromTypeInfo(leftRowType.getTypeAt(leftField), false),
+        leftRowType.getFieldIndex(leftField))
+    }
+
+    def makeRightInputRef(rightField: String): RexNode = {
+      rexBuilder.makeInputRef(
+        typeFactory.createTypeFromTypeInfo(rightRowType.getTypeAt(rightField), false),
+        rightRowType.getFieldIndex(rightField) + leftRowType.getFieldTypes.length)
+    }
+  }
+
+  class OrdersRatesProctimeTemporalJoinInfo()
+    extends ProctimeTemporalJoinInfo(
+      ordersProctimeType,
+      ratesProctimeType,
+      ORDERS_KEY,
+      RATES_KEY,
+      ORDERS_PROCTIME)
+
+  class ProctimeTemporalJoinInfo(
+      leftRowType: RowTypeInfo,
+      rightRowType: RowTypeInfo,
+      val leftKey: String,
+      val rightKey: String,
+      val leftTimeAttribute: String)
+    extends TemporalJoinInfo(leftRowType, rightRowType, leftKey, rightKey) {
+
+    override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+      LogicalTemporalTableJoin.makeProcTimeTemporalJoinConditionCall(
+        rexBuilder,
+        makeLeftInputRef(leftTimeAttribute),
+        makeRightInputRef(rightKey))
+    }
+  }
+
+  class MissingTemporalJoinConditionJoinInfo(
+      leftRowType: RowTypeInfo,
+      rightRowType: RowTypeInfo,
+      leftKey: String,
+      rightKey: String,
+      isEquiJoin: Boolean)
+    extends TemporalJoinInfo(leftRowType, rightRowType, leftKey, rightKey) {
+
+    override def isEqui: Boolean = isEquiJoin
+
+    override def getRemaining(rexBuilder: RexBuilder): RexNode = if (isEquiJoin) {
+      rexBuilder.makeLiteral(true)
+    }
+    else {
+      rexBuilder.makeCall(
+        SqlStdOperatorTable.GREATER_THAN,
+        rexBuilder.makeCall(
+          SqlStdOperatorTable.CONCAT,
+          rexBuilder.makeLiteral("A"),
+          makeLeftInputRef(leftKey)),
+        makeRightInputRef(rightKey))
+    }
+  }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
new file mode 100644
index 0000000..b0304a3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class TemporalJoinITCase extends StreamingWithStateTestBase {
+
+  /**
+    * Because of nature of the processing time, we can not (or at least it is not that easy)
+    * validate the result here. Instead of that, here we are just testing whether there are no
+    * exceptions in a full blown ITCase. Actual correctness is tested in unit tests.
+    */
+  @Test
+  def testProcessTimeInnerJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val sqlQuery =
+      """
+        |SELECT
+        |  o.amount * r.rate AS amount
+        |FROM
+        |  Orders AS o,
+        |  LATERAL TABLE (Rates(o.proctime)) AS r
+        |WHERE r.currency = o.currency
+        |""".stripMargin
+
+    val ordersData = new mutable.MutableList[(Long, String)]
+    ordersData.+=((2L, "Euro"))
+    ordersData.+=((1L, "US Dollar"))
+    ordersData.+=((50L, "Yen"))
+    ordersData.+=((3L, "Euro"))
+    ordersData.+=((5L, "US Dollar"))
+
+    val ratesHistoryData = new mutable.MutableList[(String, Long)]
+    ratesHistoryData.+=(("US Dollar", 102L))
+    ratesHistoryData.+=(("Euro", 114L))
+    ratesHistoryData.+=(("Yen", 1L))
+    ratesHistoryData.+=(("Euro", 116L))
+    ratesHistoryData.+=(("Euro", 119L))
+
+    val orders = env
+      .fromCollection(ordersData)
+      .toTable(tEnv, 'amount, 'currency, 'proctime.proctime)
+    val ratesHistory = env
+      .fromCollection(ratesHistoryData)
+      .toTable(tEnv, 'currency, 'rate, 'proctime.proctime)
+
+    tEnv.registerTable("Orders", orders)
+    tEnv.registerTable("RatesHistory", ratesHistory)
+    tEnv.registerFunction(
+      "Rates",
+      ratesHistory.createTemporalTableFunction('proctime, 'currency))
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+  }
+
+  @Test
+  def testEventTimeInnerJoin(): Unit = {
+    expectedException.expect(classOf[TableException])
+    expectedException.expectMessage("Event time temporal joins are not yet supported")
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val sqlQuery =
+      """
+        |SELECT
+        |  o.amount * r.rate AS amount
+        |FROM
+        |  Orders AS o,
+        |  LATERAL TABLE (Rates(o.rowtime)) AS r
+        |WHERE r.currency = o.currency
+        |""".stripMargin
+
+    val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
+
+    val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
+
+    val orders = env
+      .fromCollection(ordersData)
+      .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)
+    val ratesHistory = env
+      .fromCollection(ratesHistoryData)
+      .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)
+
+    tEnv.registerTable("Orders", orders)
+    tEnv.registerTable("RatesHistory", ratesHistory)
+    tEnv.registerFunction(
+      "Rates",
+      ratesHistory.createTemporalTableFunction('rowtime, 'currency))
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+  }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
index b3eeb59..b8c414f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
@@ -21,10 +21,15 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
 import org.apache.flink.runtime.state.StateBackend
 import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Rule
-import org.junit.rules.TemporaryFolder
+import org.junit.rules.{ExpectedException, TemporaryFolder}
 
 class StreamingWithStateTestBase extends AbstractTestBase {
 
+  val expectedException = ExpectedException.none()
+
+  @Rule
+  def thrown = expectedException
+
   val _tempFolder = new TemporaryFolder
 
   @Rule