You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:44 UTC
[flink] 11/11: [FLINK-9714][table] Support versioned joins with
processing time
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 00add9cdc02ccfacb38a566099d877e4669c6f65
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Sep 20 14:15:48 2018 +0200
[FLINK-9714][table] Support versioned joins with processing time
---
.../logical/rel/LogicalTemporalTableJoin.scala | 42 +-
.../apache/flink/table/plan/nodes/CommonJoin.scala | 9 +
...taStreamTemporalJoinToCoProcessTranslator.scala | 237 +++++++++++
.../datastream/DataStreamTemporalTableJoin.scala | 29 +-
.../flink/table/runtime/join/TemporalJoin.scala | 93 +++++
.../TemporalTableJoinValidationTest.scala | 72 +++-
.../table/runtime/harness/HarnessTestBase.scala | 7 +
.../runtime/harness/TemporalJoinHarnessTest.scala | 452 +++++++++++++++++++++
.../runtime/stream/sql/TemporalJoinITCase.scala | 135 ++++++
.../runtime/utils/StreamingWithStateTestBase.scala | 7 +-
10 files changed, 1068 insertions(+), 15 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalTemporalTableJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalTemporalTableJoin.scala
index 3b1d51b..7fdc5e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalTemporalTableJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalTemporalTableJoin.scala
@@ -23,7 +23,7 @@ import java.util.Collections
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core._
-import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexNode}
import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
import org.apache.flink.util.Preconditions.checkArgument
@@ -101,6 +101,38 @@ object LogicalTemporalTableJoin {
OperandTypes.ANY)),
SqlFunctionCategory.SYSTEM)
+ def isRowtimeCall(call: RexCall): Boolean = {
+ checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+ call.getOperands.size() == 3
+ }
+
+ def isProctimeCall(call: RexCall): Boolean = {
+ checkArgument(call.getOperator == TEMPORAL_JOIN_CONDITION)
+ call.getOperands.size() == 2
+ }
+
+ def makeRowTimeTemporalJoinConditionCall(
+ rexBuilder: RexBuilder,
+ leftTimeAttribute: RexNode,
+ rightTimeAttribute: RexNode,
+ rightPrimaryKeyExpression: RexNode): RexNode = {
+ rexBuilder.makeCall(
+ TEMPORAL_JOIN_CONDITION,
+ leftTimeAttribute,
+ rightTimeAttribute,
+ rightPrimaryKeyExpression)
+ }
+
+ def makeProcTimeTemporalJoinConditionCall(
+ rexBuilder: RexBuilder,
+ leftTimeAttribute: RexNode,
+ rightPrimaryKeyExpression: RexNode): RexNode = {
+ rexBuilder.makeCall(
+ TEMPORAL_JOIN_CONDITION,
+ leftTimeAttribute,
+ rightPrimaryKeyExpression)
+ }
+
/**
* See [[LogicalTemporalTableJoin]]
*/
@@ -119,8 +151,8 @@ object LogicalTemporalTableJoin {
traitSet,
left,
right,
- rexBuilder.makeCall(
- TEMPORAL_JOIN_CONDITION,
+ makeRowTimeTemporalJoinConditionCall(
+ rexBuilder,
leftTimeAttribute,
rightTimeAttribute,
rightPrimaryKeyExpression))
@@ -148,8 +180,8 @@ object LogicalTemporalTableJoin {
traitSet,
left,
right,
- rexBuilder.makeCall(
- TEMPORAL_JOIN_CONDITION,
+ makeProcTimeTemporalJoinConditionCall(
+ rexBuilder,
leftTimeAttribute,
rightPrimaryKeyExpression))
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
index 7d0ca35..3d98a4d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
@@ -48,6 +48,15 @@ trait CommonJoin {
}
}
+ private[flink] def temporalJoinToString(
+ inputType: RelDataType,
+ joinCondition: RexNode,
+ joinType: JoinRelType,
+ expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+ "Temporal" + joinToString(inputType, joinCondition, joinType, expression)
+ }
+
private[flink] def joinToString(
inputType: RelDataType,
joinCondition: RexNode,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
new file mode 100644
index 0000000..467e84f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rex._
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, TableException, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory._
+import org.apache.flink.table.codegen.GeneratedFunction
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin._
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.runtime.join.TemporalJoin
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+class DataStreamTemporalJoinToCoProcessTranslator private (
+ textualRepresentation: String,
+ config: TableConfig,
+ returnType: TypeInformation[Row],
+ leftSchema: RowSchema,
+ rightSchema: RowSchema,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder,
+ leftTimeAttribute: RexNode,
+ rightTimeAttribute: Option[RexNode],
+ rightPrimaryKeyExpression: RexNode,
+ remainingNonEquiJoinPredicates: RexNode)
+ extends DataStreamJoinToCoProcessTranslator(
+ config,
+ returnType,
+ leftSchema,
+ rightSchema,
+ joinInfo,
+ rexBuilder) {
+
+ override val nonEquiJoinPredicates: Option[RexNode] = Some(remainingNonEquiJoinPredicates)
+
+ override protected def createCoProcessFunction(
+ joinType: JoinRelType,
+ queryConfig: StreamQueryConfig,
+ joinFunction: GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row])
+ : CoProcessFunction[CRow, CRow, CRow] = {
+
+ if (rightTimeAttribute.isDefined) {
+ throw new ValidationException(
+ s"Currently only proctime temporal joins are supported in [$textualRepresentation]")
+ }
+
+ joinType match {
+ case JoinRelType.INNER =>
+ new TemporalJoin(
+ leftSchema.typeInfo,
+ rightSchema.typeInfo,
+ joinFunction.name,
+ joinFunction.code,
+ queryConfig)
+ case _ =>
+ throw new ValidationException(
+ s"Only ${JoinRelType.INNER} temporal join is supported in [$textualRepresentation]")
+ }
+ }
+}
+
+object DataStreamTemporalJoinToCoProcessTranslator {
+ def create(
+ textualRepresentation: String,
+ config: TableConfig,
+ returnType: TypeInformation[Row],
+ leftSchema: RowSchema,
+ rightSchema: RowSchema,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder): DataStreamTemporalJoinToCoProcessTranslator = {
+
+ checkState(
+ !joinInfo.isEqui,
+ "Missing %s in join condition",
+ TEMPORAL_JOIN_CONDITION)
+
+ val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
+ val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
+ textualRepresentation,
+ leftSchema.typeInfo.getTotalFields,
+ joinInfo,
+ rexBuilder)
+
+ val remainingNonEquiJoinPredicates = temporalJoinConditionExtractor.apply(nonEquiJoinRex)
+
+ checkState(
+ temporalJoinConditionExtractor.leftTimeAttribute.isDefined &&
+ temporalJoinConditionExtractor.rightPrimaryKeyExpression.isDefined,
+ "Missing %s in join condition",
+ TEMPORAL_JOIN_CONDITION)
+
+ new DataStreamTemporalJoinToCoProcessTranslator(
+ textualRepresentation,
+ config,
+ returnType,
+ leftSchema,
+ rightSchema,
+ joinInfo,
+ rexBuilder,
+ temporalJoinConditionExtractor.leftTimeAttribute.get,
+ temporalJoinConditionExtractor.rightTimeAttribute,
+ temporalJoinConditionExtractor.rightPrimaryKeyExpression.get,
+ remainingNonEquiJoinPredicates)
+ }
+
+ private class TemporalJoinConditionExtractor(
+ textualRepresentation: String,
+ rightKeysStartingOffset: Int,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder)
+
+ extends RexShuttle {
+
+ var leftTimeAttribute: Option[RexNode] = None
+
+ var rightTimeAttribute: Option[RexNode] = None
+
+ var rightPrimaryKeyExpression: Option[RexNode] = None
+
+ override def visitCall(call: RexCall): RexNode = {
+ if (call.getOperator != TEMPORAL_JOIN_CONDITION) {
+ return super.visitCall(call)
+ }
+
+ checkState(
+ leftTimeAttribute.isEmpty
+ && rightPrimaryKeyExpression.isEmpty
+ && rightTimeAttribute.isEmpty,
+ "Multiple %s functions in [%s]",
+ TEMPORAL_JOIN_CONDITION,
+ textualRepresentation)
+
+ if (LogicalTemporalTableJoin.isRowtimeCall(call)) {
+ leftTimeAttribute = Some(call.getOperands.get(0))
+ rightTimeAttribute = Some(call.getOperands.get(1))
+
+ rightPrimaryKeyExpression = Some(validateRightPrimaryKey(call.getOperands.get(2)))
+
+ if (!isRowtimeIndicatorType(rightTimeAttribute.get.getType)) {
+ throw new ValidationException(
+ s"Non rowtime timeAttribute [${rightTimeAttribute.get.getType}] " +
+ s"used to create TemporalTableFunction")
+ }
+ if (!isRowtimeIndicatorType(leftTimeAttribute.get.getType)) {
+ throw new ValidationException(
+ s"Non rowtime timeAttribute [${leftTimeAttribute.get.getType}] " +
+ s"passed as the argument to TemporalTableFunction")
+ }
+
+ throw new TableException("Event time temporal joins are not yet supported.")
+ }
+ else if (LogicalTemporalTableJoin.isProctimeCall(call)) {
+ leftTimeAttribute = Some(call.getOperands.get(0))
+ rightPrimaryKeyExpression = Some(validateRightPrimaryKey(call.getOperands.get(1)))
+
+ if (!isProctimeIndicatorType(leftTimeAttribute.get.getType)) {
+ throw new ValidationException(
+ s"Non processing timeAttribute [${leftTimeAttribute.get.getType}] " +
+ s"passed as the argument to TemporalTableFunction")
+ }
+ }
+ else {
+ throw new IllegalStateException(
+ s"Unsupported invocation $call in [$textualRepresentation]")
+ }
+ rexBuilder.makeLiteral(true)
+ }
+
+ private def validateRightPrimaryKey(rightPrimaryKey: RexNode): RexNode = {
+ if (joinInfo.rightKeys.size() != 1) {
+ throw new ValidationException(
+ s"Only single column join key is supported. " +
+ s"Found ${joinInfo.rightKeys} in [$textualRepresentation]")
+ }
+ val rightKey = joinInfo.rightKeys.get(0) + rightKeysStartingOffset
+
+ val primaryKeyVisitor = new PrimaryKeyVisitor(textualRepresentation)
+ rightPrimaryKey.accept(primaryKeyVisitor)
+
+ primaryKeyVisitor.inputReference match {
+ case None =>
+ throw new IllegalStateException(
+ s"Failed to find primary key reference in [$textualRepresentation]")
+ case Some(primaryKeyInputReference) if primaryKeyInputReference != rightKey =>
+ throw new ValidationException(
+ s"Join key [$rightKey] must be the same as " +
+ s"temporal table's primary key [$primaryKeyInputReference] " +
+ s"in [$textualRepresentation]")
+ case _ =>
+ rightPrimaryKey
+ }
+ }
+ }
+
+ /**
+ * Extracts input references from primary key expression.
+ */
+ private class PrimaryKeyVisitor(textualRepresentation: String)
+ extends RexDefaultVisitor[RexNode] {
+
+ var inputReference: Option[Int] = None
+
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ inputReference = Some(inputRef.getIndex)
+ inputRef
+ }
+
+ override def visitNode(rexNode: RexNode): RexNode = {
+ throw new ValidationException(
+ s"Unsupported right primary key expression [$rexNode] in [$textualRepresentation]")
+ }
+ }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalTableJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalTableJoin.scala
index 60f36d3..d0c2e96 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalTableJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalTableJoin.scala
@@ -22,12 +22,20 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.functions.FlatJoinFunction
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
+import org.apache.flink.table.codegen.FunctionCodeGenerator
import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.runtime.CRowKeySelector
+import org.apache.flink.table.runtime.join._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
import org.apache.flink.util.Preconditions.checkState
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
/**
* RelNode for a stream join with [[org.apache.flink.table.functions.TemporalTableFunction]].
*/
@@ -74,9 +82,14 @@ class DataStreamTemporalTableJoin(
ruleDescription)
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- queryConfig: StreamQueryConfig): DataStream[CRow] = {
- throw new NotImplementedError()
- }
-}
+ override protected def createTranslator(
+ tableEnv: StreamTableEnvironment): DataStreamJoinToCoProcessTranslator = {
+ DataStreamTemporalJoinToCoProcessTranslator.create(
+ this.toString,
+ tableEnv.getConfig,
+ schema.typeInfo,
+ leftSchema,
+ rightSchema,
+ joinInfo,
+ cluster.getRexBuilder)
+ }}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalJoin.scala
new file mode 100644
index 0000000..5087515
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalJoin.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+class TemporalJoin(
+ leftType: TypeInformation[Row],
+ rightType: TypeInformation[Row],
+ genJoinFuncName: String,
+ genJoinFuncCode: String,
+ queryConfig: StreamQueryConfig)
+ extends CoProcessFunction[CRow, CRow, CRow]
+ with Compiler[FlatJoinFunction[Row, Row, Row]]
+ with Logging {
+
+ validateEqualsHashCode("join", leftType)
+ validateEqualsHashCode("join", rightType)
+
+ protected var rightState: ValueState[Row] = _
+ protected var cRowWrapper: CRowWrappingCollector = _
+
+ protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+ override def open(parameters: Configuration): Unit = {
+ val clazz = compile(
+ getRuntimeContext.getUserCodeClassLoader,
+ genJoinFuncName,
+ genJoinFuncCode)
+
+ joinFunction = clazz.newInstance()
+
+ val rightStateDescriptor = new ValueStateDescriptor[Row]("right", rightType)
+ rightState = getRuntimeContext.getState(rightStateDescriptor)
+
+ cRowWrapper = new CRowWrappingCollector()
+ }
+
+ override def processElement1(
+ value: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ if (rightState.value() == null) {
+ return
+ }
+
+ cRowWrapper.out = out
+ cRowWrapper.setChange(value.change)
+
+ val rightSideRow = rightState.value()
+ joinFunction.join(value.row, rightSideRow, cRowWrapper)
+ }
+
+ override def processElement2(
+ value: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ if (value.change) {
+ rightState.update(value.row)
+ } else {
+ rightState.clear()
+ }
+ }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
index ab282ec..58ad493 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.api.stream.table.validation
import java.sql.Timestamp
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.utils._
import org.junit.Test
@@ -33,9 +33,18 @@ class TemporalTableJoinValidationTest extends TableTestBase {
val orders = util.addTable[(Long, String, Timestamp)](
"Orders", 'o_amount, 'o_currency, 'o_rowtime.rowtime)
+ val ordersProctime = util.addTable[(Long, String)](
+ "OrdersProctime", 'o_amount, 'o_currency, 'o_rowtime.proctime)
+
+ val ordersWithoutTimeAttribute = util.addTable[(Long, String, Timestamp)](
+ "OrdersWithoutTimeAttribute", 'o_amount, 'o_currency, 'o_rowtime)
+
val ratesHistory = util.addTable[(String, Int, Timestamp)](
"RatesHistory", 'currency, 'rate, 'rowtime.rowtime)
+ val ratesHistoryWithoutTimeAttribute = util.addTable[(String, Int, Timestamp)](
+ "ratesHistoryWithoutTimeAttribute", 'currency, 'rate, 'rowtime)
+
@Test
def testInvalidFieldReference(): Unit = {
expectedException.expect(classOf[ValidationException])
@@ -51,4 +60,65 @@ class TemporalTableJoinValidationTest extends TableTestBase {
ratesHistory.createTemporalTableFunction("rowtime", "foobar")
}
+
+ @Test
+ def testNonTimeIndicatorOnRightSide(): Unit = {
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage(
+ "Non rowtime timeAttribute [TIMESTAMP(3)] used to create TemporalTableFunction")
+
+ val rates = ratesHistoryWithoutTimeAttribute.createTemporalTableFunction('rowtime, 'currency)
+
+ val result = orders
+ .join(rates('o_rowtime), "currency = o_currency")
+ .select("o_amount * rate").as("rate")
+
+ util.explain(result)
+ }
+
+ @Test
+ def testNonTimeIndicatorOnLeftSide(): Unit = {
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage(
+ "Non rowtime timeAttribute [TIMESTAMP(3)] passed as the argument to TemporalTableFunction")
+
+ val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency)
+
+ val result = ordersWithoutTimeAttribute
+ .join(rates('o_rowtime), "currency = o_currency")
+ .select("o_amount * rate").as("rate")
+
+ util.explain(result)
+ }
+
+ @Test
+ def testMixedTimeIndicators(): Unit = {
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage(
+ "Non rowtime timeAttribute [TIME ATTRIBUTE(PROCTIME)] passed as the argument " +
+ "to TemporalTableFunction")
+
+ val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency)
+
+ val result = ordersProctime
+ .join(rates('o_rowtime), "currency = o_currency")
+ .select("o_amount * rate").as("rate")
+
+ util.explain(result)
+ }
+
+ @Test
+ def testEventTimeIndicators(): Unit = {
+ expectedException.expect(classOf[TableException])
+ expectedException.expectMessage(
+ "Event time temporal joins are not yet supported.")
+
+ val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency)
+
+ val result = orders
+ .join(rates('o_rowtime), "currency = o_currency")
+ .select("o_amount * rate").as("rate")
+
+ util.explain(result)
+ }
}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index d494c21..28b7d14 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -36,8 +36,15 @@ import org.apache.flink.table.functions.aggfunctions.{IntSumWithRetractAggFuncti
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.getAccumulatorTypeOfAggregateFunction
import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, RowResultSortComparatorWithWatermarks}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.junit.Rule
+import org.junit.rules.ExpectedException
class HarnessTestBase {
+ // used for accurate exception information checking.
+ val expectedException = ExpectedException.none()
+
+ @Rule
+ def thrown = expectedException
val longMinWithRetractAggFunction: String =
UserDefinedFunctionUtils.serialize(new LongMinWithRetractAggFunction)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
new file mode 100644
index 0000000..43f3548
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.util.ImmutableIntList
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.{TableConfig, Types, ValidationException}
+import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin.TEMPORAL_JOIN_CONDITION
+import org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CRowKeySelector
+import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, TestStreamQueryConfig}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import org.hamcrest.Matchers.startsWith
+import org.junit.Test
+
+class TemporalJoinHarnessTest extends HarnessTestBase {
+
+ private val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
+
+ private val tableConfig = new TableConfig
+
+ private val queryConfig =
+ new TestStreamQueryConfig(Time.milliseconds(2), Time.milliseconds(4))
+
+ private val ORDERS_KEY = "o_currency"
+
+ private val ORDERS_PROCTIME = "o_proctime"
+
+ private val RATES_KEY = "r_currency"
+
+ private val ordersRowtimeType = new RowTypeInfo(
+ Array[TypeInformation[_]](
+ Types.LONG,
+ Types.STRING,
+ TimeIndicatorTypeInfo.ROWTIME_INDICATOR),
+ Array("o_amount", ORDERS_KEY, "o_rowtime"))
+
+ private val ordersProctimeType = new RowTypeInfo(
+ Array[TypeInformation[_]](
+ Types.LONG,
+ Types.STRING,
+ TimeIndicatorTypeInfo.PROCTIME_INDICATOR),
+ Array("o_amount", ORDERS_KEY, ORDERS_PROCTIME))
+
+ private val ratesRowtimeType = new RowTypeInfo(
+ Array[TypeInformation[_]](
+ Types.STRING,
+ Types.LONG,
+ TimeIndicatorTypeInfo.ROWTIME_INDICATOR),
+ Array(RATES_KEY, "r_rate", "r_rowtime"))
+
+ private val ratesProctimeType = new RowTypeInfo(
+ Array[TypeInformation[_]](
+ Types.STRING,
+ Types.LONG,
+ TimeIndicatorTypeInfo.PROCTIME_INDICATOR),
+ Array(RATES_KEY, "r_rate", "r_proctime"))
+
+ private val joinRowtimeType = new RowTypeInfo(
+ ordersRowtimeType.getFieldTypes ++ ratesRowtimeType.getFieldTypes,
+ ordersRowtimeType.getFieldNames ++ ratesRowtimeType.getFieldNames)
+
+ private val rexBuilder = new RexBuilder(typeFactory)
+
+ @Test
+ def testProctime() {
+ val testHarness = createTestHarness(new OrdersRatesProctimeTemporalJoinInfo)
+
+ testHarness.open()
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ // process without conversion rates
+ testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", null)))
+
+ // initiate conversion rates
+ testHarness.processElement2(new StreamRecord(CRow("US Dollar", 102L, null)))
+ testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, null)))
+ testHarness.processElement2(new StreamRecord(CRow("Yen", 1L, null)))
+
+ // process with conversion rates
+ testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", null)))
+ testHarness.processElement1(new StreamRecord(CRow(1L, "US Dollar", null)))
+ testHarness.processElement1(new StreamRecord(CRow(50L, "Yen", null)))
+
+ expectedOutput.add(new StreamRecord(CRow(2L, "Euro", null, "Euro", 114L, null)))
+ expectedOutput.add(new StreamRecord(CRow(1L, "US Dollar", null, "US Dollar", 102L, null)))
+ expectedOutput.add(new StreamRecord(CRow(50L, "Yen", null, "Yen", 1L, null)))
+
+ // update Euro
+ testHarness.processElement2(new StreamRecord(CRow("Euro", 116L, null)))
+
+ // process Euro
+ testHarness.processElement1(new StreamRecord(CRow(3L, "Euro", null)))
+
+ expectedOutput.add(new StreamRecord(CRow(3L, "Euro", null, "Euro", 116L, null)))
+
+ // again update Euro
+ testHarness.processElement2(new StreamRecord(CRow("Euro", 119L, null)))
+
+ // process US Dollar
+ testHarness.processElement1(new StreamRecord(CRow(5L, "US Dollar", null)))
+
+ expectedOutput.add(new StreamRecord(CRow(5L, "US Dollar", null, "US Dollar", 102L, null)))
+
+ verify(expectedOutput, testHarness.getOutput, new RowResultSortComparator())
+
+ testHarness.close()
+ }
+
+ @Test
+ def testNonEquiProctime() {
+ val testHarness = createTestHarness(
+ new ProctimeTemporalJoinInfo(
+ new RowTypeInfo(
+ ordersProctimeType.getFieldTypes :+ Types.INT,
+ ordersProctimeType.getFieldNames :+ "foo"),
+ new RowTypeInfo(
+ ratesProctimeType.getFieldTypes :+ Types.INT,
+ ratesProctimeType.getFieldNames :+ "bar"),
+ ORDERS_KEY,
+ RATES_KEY,
+ ORDERS_PROCTIME) {
+ /**
+ * @return [[LogicalTemporalTableJoin.TEMPORAL_JOIN_CONDITION]](...) AND
+ * leftInputRef(3) > rightInputRef(3)
+ */
+ override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.AND,
+ super.getRemaining(rexBuilder),
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.GREATER_THAN,
+ makeLeftInputRef("foo"),
+ makeRightInputRef("bar")))
+ }
+ })
+
+ testHarness.open()
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ // initiate conversion rates
+ testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, null, 42)))
+ testHarness.processElement2(new StreamRecord(CRow("Yen", 1L, null, 42)))
+
+ // process with conversion rates
+ testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", null, 0)))
+ testHarness.processElement1(new StreamRecord(CRow(50L, "Yen", null, 44)))
+
+ expectedOutput.add(new StreamRecord(CRow(50L, "Yen", null, 44, "Yen", 1L, null, 42)))
+
+ // update Euro
+ testHarness.processElement2(new StreamRecord(CRow("Euro", 116L, null, 44)))
+
+ // process Euro
+ testHarness.processElement1(new StreamRecord(CRow(3L, "Euro", null, 42)))
+ testHarness.processElement1(new StreamRecord(CRow(4L, "Euro", null, 44)))
+ testHarness.processElement1(new StreamRecord(CRow(5L, "Euro", null, 1337)))
+
+ expectedOutput.add(new StreamRecord(CRow(5L, "Euro", null, 1337, "Euro", 116L, null, 44)))
+
+ // process US Dollar
+ testHarness.processElement1(new StreamRecord(CRow(5L, "US Dollar", null, 1337)))
+
+ verify(expectedOutput, testHarness.getOutput, new RowResultSortComparator())
+
+ testHarness.close()
+ }
+
+ @Test
+ def testMissingTemporalJoinCondition() {
+ expectedException.expect(classOf[IllegalStateException])
+ expectedException.expectMessage(startsWith(s"Missing ${TEMPORAL_JOIN_CONDITION.getName}"))
+
+ translateJoin(new TemporalJoinInfo(
+ ordersProctimeType,
+ ratesProctimeType,
+ ORDERS_KEY,
+ RATES_KEY) {
+
+ override def isEqui: Boolean = true
+
+ override def getRemaining(rexBuilder: RexBuilder): RexNode = rexBuilder.makeLiteral(true)
+ })
+ }
+
+ @Test
+ def testNonEquiMissingTemporalJoinCondition() {
+ expectedException.expect(classOf[IllegalStateException])
+ expectedException.expectMessage(startsWith(s"Missing ${TEMPORAL_JOIN_CONDITION.getName}"))
+
+ translateJoin(new TemporalJoinInfo(
+ ordersProctimeType,
+ ratesProctimeType,
+ ORDERS_KEY,
+ RATES_KEY) {
+
+ override def isEqui: Boolean = true
+
+ override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.GREATER_THAN,
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.CONCAT,
+ rexBuilder.makeLiteral("A"),
+ makeLeftInputRef(ORDERS_KEY)),
+ makeRightInputRef(RATES_KEY))
+ }
+ })
+ }
+
+ @Test
+ def testTwoTemporalJoinConditions() {
+ expectedException.expect(classOf[IllegalStateException])
+ expectedException.expectMessage(startsWith(s"Multiple $TEMPORAL_JOIN_CONDITION functions"))
+
+ translateJoin(
+ new OrdersRatesProctimeTemporalJoinInfo() {
+ override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.OR,
+ super.getRemaining(rexBuilder),
+ super.getRemaining(rexBuilder))
+ }
+ })
+ }
+
+ @Test
+ def testIncorrectTemporalJoinCondition() {
+ expectedException.expect(classOf[IllegalStateException])
+ expectedException.expectMessage(startsWith(s"Unsupported invocation"))
+
+ translateJoin(
+ new OrdersRatesProctimeTemporalJoinInfo() {
+ override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+ rexBuilder.makeCall(
+ TEMPORAL_JOIN_CONDITION,
+ makeLeftInputRef(leftKey),
+ makeLeftInputRef(leftKey),
+ makeLeftInputRef(leftKey),
+ makeRightInputRef(rightKey))
+ }
+ })
+ }
+
+ @Test
+ def testUnsupportedPrimaryKeyInTemporalJoinCondition() {
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage(startsWith("Unsupported right primary key expression"))
+
+ translateJoin(
+ new OrdersRatesProctimeTemporalJoinInfo() {
+ override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+ LogicalTemporalTableJoin.makeProcTimeTemporalJoinConditionCall(
+ rexBuilder,
+ makeLeftInputRef(leftTimeAttribute),
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.CONCAT,
+ rexBuilder.makeLiteral("A"),
+ makeRightInputRef(RATES_KEY)))
+ }
+ })
+ }
+
+ @Test
+ def testMultipleJoinKeys() {
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage(startsWith(s"Only single column join key"))
+
+ translateJoin(
+ new TemporalJoinInfo(
+ ordersProctimeType,
+ ratesProctimeType,
+ ImmutableIntList.of(0, 1),
+ ImmutableIntList.of(1, 0)) {
+
+ override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+ LogicalTemporalTableJoin.makeProcTimeTemporalJoinConditionCall(
+ rexBuilder,
+ makeLeftInputRef(ORDERS_PROCTIME),
+ makeRightInputRef(RATES_KEY))
+ }
+ })
+ }
+
+ @Test
+ def testNonInnerJoin() {
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage(startsWith(s"Only ${JoinRelType.INNER} temporal join"))
+
+ translateJoin(new OrdersRatesProctimeTemporalJoinInfo, JoinRelType.FULL)
+ }
+
+ def createTestHarness(temporalJoinInfo: TemporalJoinInfo)
+ : KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow] = {
+
+ val (leftKeySelector, rightKeySelector, joinCoProcessFunction) =
+ translateJoin(temporalJoinInfo)
+
+ val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] =
+ new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinCoProcessFunction)
+
+ new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow](
+ operator,
+ leftKeySelector.asInstanceOf[KeySelector[CRow, String]],
+ rightKeySelector.asInstanceOf[KeySelector[CRow, String]],
+ BasicTypeInfo.STRING_TYPE_INFO,
+ 1,
+ 1,
+ 0)
+ }
+
+ def translateJoin(joinInfo: TemporalJoinInfo, joinRelType: JoinRelType = JoinRelType.INNER)
+ : (CRowKeySelector, CRowKeySelector, CoProcessFunction[CRow, CRow, CRow]) = {
+
+ val leftType = joinInfo.leftRowType
+ val rightType = joinInfo.rightRowType
+ val joinType = new RowTypeInfo(
+ leftType.getFieldTypes ++ rightType.getFieldTypes,
+ leftType.getFieldNames ++ rightType.getFieldNames)
+
+ val joinTranslator = DataStreamTemporalJoinToCoProcessTranslator.create(
+ "TemporalJoin",
+ tableConfig,
+ joinType,
+ new RowSchema(typeFactory.createTypeFromTypeInfo(leftType, false)),
+ new RowSchema(typeFactory.createTypeFromTypeInfo(rightType, false)),
+ joinInfo,
+ rexBuilder)
+
+ val joinCoProcessFunction = joinTranslator.getCoProcessFunction(
+ joinRelType,
+ joinType.getFieldNames,
+ "TemporalJoin",
+ queryConfig)
+
+ (joinTranslator.getLeftKeySelector(),
+ joinTranslator.getRightKeySelector(),
+ joinCoProcessFunction)
+ }
+
+ abstract class TemporalJoinInfo(
+ val leftRowType: RowTypeInfo,
+ val rightRowType: RowTypeInfo,
+ leftKeys: ImmutableIntList,
+ rightKeys: ImmutableIntList)
+ extends JoinInfo(leftKeys, rightKeys) {
+
+ def this(
+ leftRowType: RowTypeInfo,
+ rightRowType: RowTypeInfo,
+ leftKey: String,
+ rightKey: String) =
+ this(
+ leftRowType,
+ rightRowType,
+ ImmutableIntList.of(leftRowType.getFieldIndex(leftKey)),
+ ImmutableIntList.of(rightRowType.getFieldIndex(rightKey)))
+
+ override def isEqui: Boolean = false
+
+ def makeLeftInputRef(leftField: String): RexNode = {
+ rexBuilder.makeInputRef(
+ typeFactory.createTypeFromTypeInfo(leftRowType.getTypeAt(leftField), false),
+ leftRowType.getFieldIndex(leftField))
+ }
+
+ def makeRightInputRef(rightField: String): RexNode = {
+ rexBuilder.makeInputRef(
+ typeFactory.createTypeFromTypeInfo(rightRowType.getTypeAt(rightField), false),
+ rightRowType.getFieldIndex(rightField) + leftRowType.getFieldTypes.length)
+ }
+ }
+
+ class OrdersRatesProctimeTemporalJoinInfo()
+ extends ProctimeTemporalJoinInfo(
+ ordersProctimeType,
+ ratesProctimeType,
+ ORDERS_KEY,
+ RATES_KEY,
+ ORDERS_PROCTIME)
+
+ class ProctimeTemporalJoinInfo(
+ leftRowType: RowTypeInfo,
+ rightRowType: RowTypeInfo,
+ val leftKey: String,
+ val rightKey: String,
+ val leftTimeAttribute: String)
+ extends TemporalJoinInfo(leftRowType, rightRowType, leftKey, rightKey) {
+
+ override def getRemaining(rexBuilder: RexBuilder): RexNode = {
+ LogicalTemporalTableJoin.makeProcTimeTemporalJoinConditionCall(
+ rexBuilder,
+ makeLeftInputRef(leftTimeAttribute),
+ makeRightInputRef(rightKey))
+ }
+ }
+
+ class MissingTemporalJoinConditionJoinInfo(
+ leftRowType: RowTypeInfo,
+ rightRowType: RowTypeInfo,
+ leftKey: String,
+ rightKey: String,
+ isEquiJoin: Boolean)
+ extends TemporalJoinInfo(leftRowType, rightRowType, leftKey, rightKey) {
+
+ override def isEqui: Boolean = isEquiJoin
+
+ override def getRemaining(rexBuilder: RexBuilder): RexNode = if (isEquiJoin) {
+ rexBuilder.makeLiteral(true)
+ }
+ else {
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.GREATER_THAN,
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.CONCAT,
+ rexBuilder.makeLiteral("A"),
+ makeLeftInputRef(leftKey)),
+ makeRightInputRef(rightKey))
+ }
+ }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
new file mode 100644
index 0000000..b0304a3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class TemporalJoinITCase extends StreamingWithStateTestBase {
+
+ /**
+ * Because of nature of the processing time, we can not (or at least it is not that easy)
+ * validate the result here. Instead of that, here we are just testing whether there are no
+ * exceptions in a full blown ITCase. Actual correctness is tested in unit tests.
+ */
+ @Test
+ def testProcessTimeInnerJoin(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val sqlQuery =
+ """
+ |SELECT
+ | o.amount * r.rate AS amount
+ |FROM
+ | Orders AS o,
+ | LATERAL TABLE (Rates(o.proctime)) AS r
+ |WHERE r.currency = o.currency
+ |""".stripMargin
+
+ val ordersData = new mutable.MutableList[(Long, String)]
+ ordersData.+=((2L, "Euro"))
+ ordersData.+=((1L, "US Dollar"))
+ ordersData.+=((50L, "Yen"))
+ ordersData.+=((3L, "Euro"))
+ ordersData.+=((5L, "US Dollar"))
+
+ val ratesHistoryData = new mutable.MutableList[(String, Long)]
+ ratesHistoryData.+=(("US Dollar", 102L))
+ ratesHistoryData.+=(("Euro", 114L))
+ ratesHistoryData.+=(("Yen", 1L))
+ ratesHistoryData.+=(("Euro", 116L))
+ ratesHistoryData.+=(("Euro", 119L))
+
+ val orders = env
+ .fromCollection(ordersData)
+ .toTable(tEnv, 'amount, 'currency, 'proctime.proctime)
+ val ratesHistory = env
+ .fromCollection(ratesHistoryData)
+ .toTable(tEnv, 'currency, 'rate, 'proctime.proctime)
+
+ tEnv.registerTable("Orders", orders)
+ tEnv.registerTable("RatesHistory", ratesHistory)
+ tEnv.registerFunction(
+ "Rates",
+ ratesHistory.createTemporalTableFunction('proctime, 'currency))
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ }
+
+ @Test
+ def testEventTimeInnerJoin(): Unit = {
+ expectedException.expect(classOf[TableException])
+ expectedException.expectMessage("Event time temporal joins are not yet supported")
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+ val sqlQuery =
+ """
+ |SELECT
+ | o.amount * r.rate AS amount
+ |FROM
+ | Orders AS o,
+ | LATERAL TABLE (Rates(o.rowtime)) AS r
+ |WHERE r.currency = o.currency
+ |""".stripMargin
+
+ val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
+
+ val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
+
+ val orders = env
+ .fromCollection(ordersData)
+ .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)
+ val ratesHistory = env
+ .fromCollection(ratesHistoryData)
+ .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)
+
+ tEnv.registerTable("Orders", orders)
+ tEnv.registerTable("RatesHistory", ratesHistory)
+ tEnv.registerFunction(
+ "Rates",
+ ratesHistory.createTemporalTableFunction('rowtime, 'currency))
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
index b3eeb59..b8c414f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
@@ -21,10 +21,15 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.test.util.AbstractTestBase
import org.junit.Rule
-import org.junit.rules.TemporaryFolder
+import org.junit.rules.{ExpectedException, TemporaryFolder}
class StreamingWithStateTestBase extends AbstractTestBase {
+ val expectedException = ExpectedException.none()
+
+ @Rule
+ def thrown = expectedException
+
val _tempFolder = new TemporaryFolder
@Rule