You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/07/18 07:40:06 UTC
[1/3] flink git commit: [FLINK-6232] [table] Add support for
processing time inner windowed stream join.
Repository: flink
Updated Branches:
refs/heads/master 1125122a7 -> 471345c0e
[FLINK-6232] [table] Add support for processing time inner windowed stream join.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba6c59e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba6c59e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba6c59e6
Branch: refs/heads/master
Commit: ba6c59e6d744958db7332f88ffdd46effc9ad400
Parents: 1125122
Author: hongyuhong <ho...@huawei.com>
Authored: Thu Jul 6 11:24:04 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Jul 18 00:36:41 2017 +0200
----------------------------------------------------------------------
.../table/api/StreamTableEnvironment.scala | 109 +-----
.../calcite/RelTimeIndicatorConverter.scala | 10 +-
.../flink/table/plan/nodes/CommonJoin.scala | 73 ++++
.../table/plan/nodes/dataset/DataSetJoin.scala | 47 ++-
.../nodes/datastream/DataStreamWindowJoin.scala | 187 ++++++++++
.../flink/table/plan/rules/FlinkRuleSets.scala | 1 +
.../datastream/DataStreamWindowJoinRule.scala | 94 +++++
.../runtime/join/ProcTimeWindowInnerJoin.scala | 326 +++++++++++++++++
.../table/runtime/join/WindowJoinUtil.scala | 349 +++++++++++++++++++
.../table/updateutils/UpdateCheckUtils.scala | 128 +++++++
.../table/api/scala/stream/sql/JoinITCase.scala | 117 +++++++
.../table/api/scala/stream/sql/JoinTest.scala | 235 +++++++++++++
.../table/runtime/harness/JoinHarnessTest.scala | 236 +++++++++++++
.../KeyedTwoInputStreamOperatorTestHarness.java | 9 +
14 files changed, 1788 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index c5a66b5..f026824 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -50,6 +50,7 @@ import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowIn
import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.TypeCheckUtils
+import org.apache.flink.table.updateutils.UpdateCheckUtils
import org.apache.flink.types.Row
import _root_.scala.collection.JavaConverters._
@@ -173,10 +174,10 @@ abstract class StreamTableEnvironment(
// optimize plan
val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
// check for append only table
- val isAppendOnlyTable = isAppendOnly(optimizedPlan)
+ val isAppendOnlyTable = UpdateCheckUtils.isAppendOnly(optimizedPlan)
upsertSink.setIsAppendOnly(isAppendOnlyTable)
// extract unique key fields
- val tableKeys: Option[Array[String]] = getUniqueKeyFields(optimizedPlan)
+ val tableKeys: Option[Array[String]] = UpdateCheckUtils.getUniqueKeyFields(optimizedPlan)
// check that we have keys if the table has changes (is not append-only)
tableKeys match {
case Some(keys) => upsertSink.setKeyFields(keys)
@@ -200,7 +201,7 @@ abstract class StreamTableEnvironment(
// optimize plan
val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
// verify table is an insert-only (append-only) table
- if (!isAppendOnly(optimizedPlan)) {
+ if (!UpdateCheckUtils.isAppendOnly(optimizedPlan)) {
throw new TableException(
"AppendStreamTableSink requires that Table has only insert changes.")
}
@@ -259,21 +260,6 @@ abstract class StreamTableEnvironment(
}
}
- /** Validates that the plan produces only append changes. */
- protected def isAppendOnly(plan: RelNode): Boolean = {
- val appendOnlyValidator = new AppendOnlyValidator
- appendOnlyValidator.go(plan)
-
- appendOnlyValidator.isAppendOnly
- }
-
- /** Extracts the unique keys of the table produced by the plan. */
- protected def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
- val keyExtractor = new UniqueKeyExtractor
- keyExtractor.go(plan)
- keyExtractor.keys
- }
-
/**
* Creates a converter that maps the internal CRow type to Scala or Java Tuple2 with change flag.
*
@@ -665,7 +651,7 @@ abstract class StreamTableEnvironment(
(implicit tpe: TypeInformation[A]): DataStream[A] = {
// if no change flags are requested, verify table is an insert-only (append-only) table.
- if (!withChangeFlag && !isAppendOnly(logicalPlan)) {
+ if (!withChangeFlag && !UpdateCheckUtils.isAppendOnly(logicalPlan)) {
throw new TableException(
"Table is not an append-only table. " +
"Use the toRetractStream() in order to handle add and retract messages.")
@@ -749,90 +735,5 @@ abstract class StreamTableEnvironment(
s"$sqlPlan"
}
- private class AppendOnlyValidator extends RelVisitor {
-
- var isAppendOnly = true
-
- override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
- node match {
- case s: DataStreamRel if s.producesUpdates =>
- isAppendOnly = false
- case _ =>
- super.visit(node, ordinal, parent)
- }
- }
- }
-
- /** Identifies unique key fields in the output of a RelNode. */
- private class UniqueKeyExtractor extends RelVisitor {
-
- var keys: Option[Array[String]] = None
-
- override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
- node match {
- case c: DataStreamCalc =>
- super.visit(node, ordinal, parent)
- // check if input has keys
- if (keys.isDefined) {
- // track keys forward
- val inNames = c.getInput.getRowType.getFieldNames
- val inOutNames = c.getProgram.getNamedProjects.asScala
- .map(p => {
- c.getProgram.expandLocalRef(p.left) match {
- // output field is forwarded input field
- case i: RexInputRef => (i.getIndex, p.right)
- // output field is renamed input field
- case a: RexCall if a.getKind.equals(SqlKind.AS) =>
- a.getOperands.get(0) match {
- case ref: RexInputRef =>
- (ref.getIndex, p.right)
- case _ =>
- (-1, p.right)
- }
- // output field is not forwarded from input
- case _: RexNode => (-1, p.right)
- }
- })
- // filter all non-forwarded fields
- .filter(_._1 >= 0)
- // resolve names of input fields
- .map(io => (inNames.get(io._1), io._2))
-
- // filter by input keys
- val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
- // check if all keys have been preserved
- if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
- // all key have been preserved (but possibly renamed)
- keys = Some(outKeys.toArray)
- } else {
- // some (or all) keys have been removed. Keys are no longer unique and removed
- keys = None
- }
- }
- case _: DataStreamOverAggregate =>
- super.visit(node, ordinal, parent)
- // keys are always forwarded by Over aggregate
- case a: DataStreamGroupAggregate =>
- // get grouping keys
- val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
- keys = Some(groupKeys.toArray)
- case w: DataStreamGroupWindowAggregate =>
- // get grouping keys
- val groupKeys =
- w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
- // get window start and end time
- val windowStartEnd = w.getWindowProperties.map(_.name)
- // we have only a unique key if at least one window property is selected
- if (windowStartEnd.nonEmpty) {
- keys = Some(groupKeys ++ windowStartEnd)
- }
- case _: DataStreamRel =>
- // anything else does not forward keys or might duplicate key, so we can stop
- keys = None
- }
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 385cab2..32d6f01 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -165,8 +165,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
LogicalProject.create(input, projects, fieldNames)
}
- override def visit(join: LogicalJoin): RelNode =
- throw new TableException("Logical join in a stream environment is not supported yet.")
+ override def visit(join: LogicalJoin): RelNode = {
+ val left = join.getLeft.accept(this)
+ val right = join.getRight.accept(this)
+
+ LogicalJoin.create(left, right, join.getCondition, join.getVariablesSet, join.getJoinType)
+
+ }
+
override def visit(correlate: LogicalCorrelate): RelNode = {
// visit children and update inputs
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
new file mode 100644
index 0000000..7d0ca35
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.RelWriter
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.RexNode
+
+import scala.collection.JavaConverters._
+
+trait CommonJoin {
+
+ private[flink] def joinSelectionToString(inputType: RelDataType): String = {
+ inputType.getFieldNames.asScala.toList.mkString(", ")
+ }
+
+ private[flink] def joinConditionToString(
+ inputType: RelDataType,
+ joinCondition: RexNode,
+ expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+ val inFields = inputType.getFieldNames.asScala.toList
+ expression(joinCondition, inFields, None)
+ }
+
+ private[flink] def joinTypeToString(joinType: JoinRelType) = {
+ joinType match {
+ case JoinRelType.INNER => "InnerJoin"
+ case JoinRelType.LEFT=> "LeftOuterJoin"
+ case JoinRelType.RIGHT => "RightOuterJoin"
+ case JoinRelType.FULL => "FullOuterJoin"
+ }
+ }
+
+ private[flink] def joinToString(
+ inputType: RelDataType,
+ joinCondition: RexNode,
+ joinType: JoinRelType,
+ expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+ s"${joinTypeToString(joinType)}" +
+ s"(where: (${joinConditionToString(inputType, joinCondition, expression)}), " +
+ s"join: (${joinSelectionToString(inputType)}))"
+ }
+
+ private[flink] def joinExplainTerms(
+ pw: RelWriter,
+ inputType: RelDataType,
+ joinCondition: RexNode,
+ joinType: JoinRelType,
+ expression: (RexNode, List[String], Option[List[RexNode]]) => String): RelWriter = {
+
+ pw.item("where", joinConditionToString(inputType, joinCondition, expression))
+ .item("join", joinSelectionToString(inputType))
+ .item("joinType", joinTypeToString(joinType))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index a6c31d3..e8f3b82 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -31,11 +31,11 @@ import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.FunctionCodeGenerator
+import org.apache.flink.table.plan.nodes.CommonJoin
import org.apache.flink.table.runtime.FlatJoinRunner
import org.apache.flink.types.Row
import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
/**
@@ -55,6 +55,7 @@ class DataSetJoin(
joinHint: JoinHint,
ruleDescription: String)
extends BiRel(cluster, traitSet, leftNode, rightNode)
+ with CommonJoin
with DataSetRel {
override def deriveRowType() = rowRelDataType
@@ -76,14 +77,20 @@ class DataSetJoin(
}
override def toString: String = {
- s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))"
+ joinToString(
+ joinRowType,
+ joinCondition,
+ joinType,
+ getExpressionString)
}
override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw)
- .item("where", joinConditionToString)
- .item("join", joinSelectionToString)
- .item("joinType", joinTypeToString)
+ joinExplainTerms(
+ super.explainTerms(pw),
+ joinRowType,
+ joinCondition,
+ joinType,
+ getExpressionString)
}
override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
@@ -116,7 +123,8 @@ class DataSetJoin(
"Joins should have at least one equality condition.\n" +
s"\tLeft: ${left.toString},\n" +
s"\tRight: ${right.toString},\n" +
- s"\tCondition: ($joinConditionToString)"
+ s"\tCondition: (${joinConditionToString(joinRowType,
+ joinCondition, getExpressionString)})"
)
}
else {
@@ -138,7 +146,8 @@ class DataSetJoin(
"Equality join predicate on incompatible types.\n" +
s"\tLeft: ${left.toString},\n" +
s"\tRight: ${right.toString},\n" +
- s"\tCondition: ($joinConditionToString)"
+ s"\tCondition: (${joinConditionToString(joinRowType,
+ joinCondition, getExpressionString)})"
)
}
})
@@ -197,7 +206,9 @@ class DataSetJoin(
genFunction.code,
genFunction.returnType)
- val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)"
+ val joinOpName =
+ s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
+ s"join: (${joinSelectionToString(joinRowType)})"
joinOperator
.where(leftKeys.toArray: _*)
@@ -205,22 +216,4 @@ class DataSetJoin(
.`with`(joinFun)
.name(joinOpName)
}
-
- private def joinSelectionToString: String = {
- getRowType.getFieldNames.asScala.toList.mkString(", ")
- }
-
- private def joinConditionToString: String = {
-
- val inFields = joinRowType.getFieldNames.asScala.toList
- getExpressionString(joinCondition, inFields, None)
- }
-
- private def joinTypeToString = joinType match {
- case JoinRelType.INNER => "InnerJoin"
- case JoinRelType.LEFT=> "LeftOuterJoin"
- case JoinRelType.RIGHT => "RightOuterJoin"
- case JoinRelType.FULL => "FullOuterJoin"
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
new file mode 100644
index 0000000..1315a79
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+ * Flink RelNode which matches along with JoinOperator and its related operations.
+ */
+class DataStreamWindowJoin(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ leftNode: RelNode,
+ rightNode: RelNode,
+ joinCondition: RexNode,
+ joinType: JoinRelType,
+ leftSchema: RowSchema,
+ rightSchema: RowSchema,
+ schema: RowSchema,
+ isRowTime: Boolean,
+ leftLowerBound: Long,
+ leftUpperBound: Long,
+ remainCondition: Option[RexNode],
+ ruleDescription: String)
+ extends BiRel(cluster, traitSet, leftNode, rightNode)
+ with CommonJoin
+ with DataStreamRel {
+
+ override def deriveRowType() = schema.logicalType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamWindowJoin(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ inputs.get(1),
+ joinCondition,
+ joinType,
+ leftSchema,
+ rightSchema,
+ schema,
+ isRowTime,
+ leftLowerBound,
+ leftUpperBound,
+ remainCondition,
+ ruleDescription)
+ }
+
+ override def toString: String = {
+ joinToString(
+ schema.logicalType,
+ joinCondition,
+ joinType,
+ getExpressionString)
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ joinExplainTerms(
+ super.explainTerms(pw),
+ schema.logicalType,
+ joinCondition,
+ joinType,
+ getExpressionString)
+ }
+
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+ val config = tableEnv.getConfig
+
+ val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
+ val isRightAppendOnly = UpdateCheckUtils.isAppendOnly(right)
+ if (!isLeftAppendOnly || !isRightAppendOnly) {
+ throw new TableException(
+ "Windowed stream join does not support updates.")
+ }
+
+ val leftDataStream = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+ val rightDataStream = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+
+ // get the equality keys and other condition
+ val joinInfo = JoinInfo.of(leftNode, rightNode, joinCondition)
+ val leftKeys = joinInfo.leftKeys.toIntArray
+ val rightKeys = joinInfo.rightKeys.toIntArray
+
+ // generate join function
+ val joinFunction =
+ WindowJoinUtil.generateJoinFunction(
+ config,
+ joinType,
+ leftSchema.physicalTypeInfo,
+ rightSchema.physicalTypeInfo,
+ schema,
+ remainCondition,
+ ruleDescription)
+
+ joinType match {
+ case JoinRelType.INNER =>
+ isRowTime match {
+ case false =>
+ // Proctime JoinCoProcessFunction
+ createProcTimeInnerJoinFunction(
+ leftDataStream,
+ rightDataStream,
+ joinFunction.name,
+ joinFunction.code,
+ leftKeys,
+ rightKeys
+ )
+ case true =>
+ // RowTime JoinCoProcessFunction
+ throw new TableException(
+ "RowTime inner join between stream and stream is not supported yet.")
+ }
+ case JoinRelType.FULL =>
+ throw new TableException(
+ "Full join between stream and stream is not supported yet.")
+ case JoinRelType.LEFT =>
+ throw new TableException(
+ "Left join between stream and stream is not supported yet.")
+ case JoinRelType.RIGHT =>
+ throw new TableException(
+ "Right join between stream and stream is not supported yet.")
+ }
+ }
+
+ def createProcTimeInnerJoinFunction(
+ leftDataStream: DataStream[CRow],
+ rightDataStream: DataStream[CRow],
+ joinFunctionName: String,
+ joinFunctionCode: String,
+ leftKeys: Array[Int],
+ rightKeys: Array[Int]): DataStream[CRow] = {
+
+ val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
+
+ val procInnerJoinFunc = new ProcTimeWindowInnerJoin(
+ leftLowerBound,
+ leftUpperBound,
+ leftSchema.physicalTypeInfo,
+ rightSchema.physicalTypeInfo,
+ joinFunctionName,
+ joinFunctionCode)
+
+ if (!leftKeys.isEmpty) {
+ leftDataStream.connect(rightDataStream)
+ .keyBy(leftKeys, rightKeys)
+ .process(procInnerJoinFunc)
+ .returns(returnTypeInfo)
+ } else {
+ leftDataStream.connect(rightDataStream)
+ .keyBy(new NullByteKeySelector[CRow](), new NullByteKeySelector[CRow]())
+ .process(procInnerJoinFunc)
+ .setParallelism(1)
+ .setMaxParallelism(1)
+ .returns(returnTypeInfo)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index ebfbeb9..90bd624 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -191,6 +191,7 @@ object FlinkRuleSets {
DataStreamUnionRule.INSTANCE,
DataStreamValuesRule.INSTANCE,
DataStreamCorrelateRule.INSTANCE,
+ DataStreamWindowJoinRule.INSTANCE,
StreamTableSourceScanRule.INSTANCE
)
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
new file mode 100644
index 0000000..c7a190f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamWindowJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.WindowJoinUtil
+
+class DataStreamWindowJoinRule
+ extends ConverterRule(
+ classOf[FlinkLogicalJoin],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.DATASTREAM,
+ "DataStreamJoinRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
+ val joinInfo = join.analyzeCondition
+
+ try {
+ WindowJoinUtil.analyzeTimeBoundary(
+ joinInfo.getRemaining(join.getCluster.getRexBuilder),
+ join.getLeft.getRowType.getFieldCount,
+ new RowSchema(join.getRowType),
+ join.getCluster.getRexBuilder,
+ TableConfig.DEFAULT)
+ true
+ } catch {
+ case _: TableException =>
+ false
+ }
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+
+ val join: FlinkLogicalJoin = rel.asInstanceOf[FlinkLogicalJoin]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+ val convLeft: RelNode = RelOptRule.convert(join.getInput(0), FlinkConventions.DATASTREAM)
+ val convRight: RelNode = RelOptRule.convert(join.getInput(1), FlinkConventions.DATASTREAM)
+ val joinInfo = join.analyzeCondition
+ val leftRowSchema = new RowSchema(convLeft.getRowType)
+ val rightRowSchema = new RowSchema(convRight.getRowType)
+
+ val (isRowTime, leftLowerBoundary, leftUpperBoundary, remainCondition) =
+ WindowJoinUtil.analyzeTimeBoundary(
+ joinInfo.getRemaining(join.getCluster.getRexBuilder),
+ leftRowSchema.logicalArity,
+ new RowSchema(join.getRowType),
+ join.getCluster.getRexBuilder,
+ TableConfig.DEFAULT)
+
+ new DataStreamWindowJoin(
+ rel.getCluster,
+ traitSet,
+ convLeft,
+ convRight,
+ join.getCondition,
+ join.getJoinType,
+ leftRowSchema,
+ rightRowSchema,
+ new RowSchema(rel.getRowType),
+ isRowTime,
+ leftLowerBoundary,
+ leftUpperBoundary,
+ remainCondition,
+ description)
+ }
+}
+
+object DataStreamWindowJoinRule {
+ val INSTANCE: RelOptRule = new DataStreamWindowJoinRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
new file mode 100644
index 0000000..97d5ccc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+ * A CoProcessFunction to support stream join stream, currently just support inner-join
+ *
+ * @param leftLowerBound
+ * the left stream lower bound, and -leftLowerBound is the right stream upper bound
+ * @param leftUpperBound
+ * the left stream upper bound, and -leftUpperBound is the right stream lower bound
+ * @param element1Type the input type of left stream
+ * @param element2Type the input type of right stream
+ * @param genJoinFuncName the function code of other non-equi condition
+ * @param genJoinFuncCode the function name of other non-equi condition
+ *
+ */
+class ProcTimeWindowInnerJoin(
+ private val leftLowerBound: Long,
+ private val leftUpperBound: Long,
+ private val element1Type: TypeInformation[Row],
+ private val element2Type: TypeInformation[Row],
+ private val genJoinFuncName: String,
+ private val genJoinFuncCode: String)
+ extends CoProcessFunction[CRow, CRow, CRow]
+ with Compiler[FlatJoinFunction[Row, Row, Row]]{
+
+ private var cRowWrapper: CRowWrappingCollector = _
+
+ /** other condition function **/
+ private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+ /** tmp list to store expired records **/
+ private var listToRemove: JList[Long] = _
+
+ /** state to hold left stream element **/
+ private var row1MapState: MapState[Long, JList[Row]] = _
+ /** state to hold right stream element **/
+ private var row2MapState: MapState[Long, JList[Row]] = _
+
+ /** state to record last timer of left stream, 0 means no timer **/
+ private var timerState1: ValueState[Long] = _
+ /** state to record last timer of right stream, 0 means no timer **/
+ private var timerState2: ValueState[Long] = _
+
+ private val leftStreamWinSize: Long = if (leftLowerBound < 0) -leftLowerBound else 0
+ private val rightStreamWinSize: Long = if (leftUpperBound > 0) leftUpperBound else 0
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ override def open(config: Configuration) {
+ LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+ s"Code:\n$genJoinFuncCode")
+ val clazz = compile(
+ getRuntimeContext.getUserCodeClassLoader,
+ genJoinFuncName,
+ genJoinFuncCode)
+ LOG.debug("Instantiating JoinFunction.")
+ joinFunction = clazz.newInstance()
+
+ listToRemove = new util.ArrayList[Long]()
+ cRowWrapper = new CRowWrappingCollector()
+ cRowWrapper.setChange(true)
+
+ // initialize row state
+ val rowListTypeInfo1: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element1Type)
+ val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1)
+ row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+ val rowListTypeInfo2: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element2Type)
+ val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2)
+ row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+ // initialize timer state
+ val valueStateDescriptor1: ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+ timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
+
+ val valueStateDescriptor2: ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
+ timerState2 = getRuntimeContext.getState(valueStateDescriptor2)
+ }
+
+ /**
+ * Process left stream records
+ *
+ * @param valueC The input value.
+ * @param ctx The ctx to register timer or get current time
+ * @param out The collector for returning result values.
+ *
+ */
+ override def processElement1(
+ valueC: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ processElement(
+ valueC,
+ ctx,
+ out,
+ leftStreamWinSize,
+ timerState1,
+ row1MapState,
+ row2MapState,
+ -leftUpperBound, // right stream lower
+ -leftLowerBound, // right stream upper
+ true
+ )
+ }
+
+ /**
+ * Process right stream records
+ *
+ * @param valueC The input value.
+ * @param ctx The ctx to register timer or get current time
+ * @param out The collector for returning result values.
+ *
+ */
+ override def processElement2(
+ valueC: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ processElement(
+ valueC,
+ ctx,
+ out,
+ rightStreamWinSize,
+ timerState2,
+ row2MapState,
+ row1MapState,
+ leftLowerBound, // left stream upper
+ leftUpperBound, // left stream upper
+ false
+ )
+ }
+
+ /**
+ * Called when a processing timer trigger.
+ * Expire left/right records which earlier than current time - windowsize.
+ *
+ * @param timestamp The timestamp of the firing timer.
+ * @param ctx The ctx to register timer or get current time
+ * @param out The collector for returning result values.
+ */
+ override def onTimer(
+ timestamp: Long,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
+
+ if (timerState1.value == timestamp) {
+ expireOutTimeRow(
+ timestamp,
+ leftStreamWinSize,
+ row1MapState,
+ timerState1,
+ ctx
+ )
+ }
+
+ if (timerState2.value == timestamp) {
+ expireOutTimeRow(
+ timestamp,
+ rightStreamWinSize,
+ row2MapState,
+ timerState2,
+ ctx
+ )
+ }
+ }
+
+ /**
+ * Puts an element from the input stream into state and search the other state to
+ * output records meet the condition, and registers a timer for the current record
+ * if there is no timer at present.
+ */
+ private def processElement(
+ valueC: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow],
+ winSize: Long,
+ timerState: ValueState[Long],
+ rowMapState: MapState[Long, JList[Row]],
+ oppoRowMapState: MapState[Long, JList[Row]],
+ oppoLowerBound: Long,
+ oppoUpperBound: Long,
+ isLeft: Boolean): Unit = {
+
+ cRowWrapper.out = out
+
+ val value = valueC.row
+
+ val curProcessTime = ctx.timerService.currentProcessingTime
+ val oppoLowerTime = curProcessTime + oppoLowerBound
+ val oppoUpperTime = curProcessTime + oppoUpperBound
+
+ // only when windowsize != 0, we need to store the element
+ if (winSize != 0) {
+ // register a timer to expire the element
+ if (timerState.value == 0) {
+ ctx.timerService.registerProcessingTimeTimer(curProcessTime + winSize + 1)
+ timerState.update(curProcessTime + winSize + 1)
+ }
+
+ var rowList = rowMapState.get(curProcessTime)
+ if (rowList == null) {
+ rowList = new util.ArrayList[Row]()
+ }
+ rowList.add(value)
+ rowMapState.put(curProcessTime, rowList)
+
+ }
+
+ // loop the other stream elements
+ val oppositeKeyIter = oppoRowMapState.keys().iterator()
+ while (oppositeKeyIter.hasNext) {
+ val eleTime = oppositeKeyIter.next()
+ if (eleTime < oppoLowerTime) {
+ listToRemove.add(eleTime)
+ } else if (eleTime <= oppoUpperTime) {
+ val oppoRowList = oppoRowMapState.get(eleTime)
+ var i = 0
+ if (isLeft) {
+ while (i < oppoRowList.size) {
+ joinFunction.join(value, oppoRowList.get(i), cRowWrapper)
+ i += 1
+ }
+ } else {
+ while (i < oppoRowList.size) {
+ joinFunction.join(oppoRowList.get(i), value, cRowWrapper)
+ i += 1
+ }
+ }
+ }
+ }
+
+ // expire records out-of-time
+ var i = listToRemove.size - 1
+ while (i >= 0) {
+ oppoRowMapState.remove(listToRemove.get(i))
+ i -= 1
+ }
+ listToRemove.clear()
+ }
+
+ /**
+ * Removes records which are outside the join window from the state.
+ * Registers a new timer if the state still holds records after the clean-up.
+ */
+ private def expireOutTimeRow(
+ curTime: Long,
+ winSize: Long,
+ rowMapState: MapState[Long, JList[Row]],
+ timerState: ValueState[Long],
+ ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = {
+
+ val expiredTime = curTime - winSize
+ val keyIter = rowMapState.keys().iterator()
+ var nextTimer: Long = 0
+ // Search for expired timestamps.
+ // If we find a non-expired timestamp, remember the timestamp and leave the loop.
+ // This way we find all expired timestamps if they are sorted without doing a full pass.
+ while (keyIter.hasNext && nextTimer == 0) {
+ val recordTime = keyIter.next
+ if (recordTime < expiredTime) {
+ listToRemove.add(recordTime)
+ } else {
+ nextTimer = recordTime
+ }
+ }
+
+ // Remove expired records from state
+ var i = listToRemove.size - 1
+ while (i >= 0) {
+ rowMapState.remove(listToRemove.get(i))
+ i -= 1
+ }
+ listToRemove.clear()
+
+ // If the state has non-expired timestamps, register a new timer.
+ // Otherwise clean the complete state for this input.
+ if (nextTimer != 0) {
+ ctx.timerService.registerProcessingTimeTimer(nextTimer + winSize + 1)
+ timerState.update(nextTimer + winSize + 1)
+ } else {
+ timerState.clear()
+ rowMapState.clear()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
new file mode 100644
index 0000000..fabeeba
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.schema.{RowSchema, TimeIndicatorRelDataType}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+ * An util class to help analyze and build join code .
+ */
+object WindowJoinUtil {
+
+ /**
+ * Analyze time-condtion to get time boundary for each stream and get the time type
+ * and return remain condition.
+ *
+ * @param condition join condition
+ * @param leftLogicalFieldCnt left stream logical field num
+ * @param inputSchema join rowtype schema
+ * @param rexBuilder util to build rexNode
+ * @param config table environment config
+ * @return isRowTime, left lower boundary, right lower boundary, remain condition
+ */
+ private[flink] def analyzeTimeBoundary(
+ condition: RexNode,
+ leftLogicalFieldCnt: Int,
+ inputSchema: RowSchema,
+ rexBuilder: RexBuilder,
+ config: TableConfig): (Boolean, Long, Long, Option[RexNode]) = {
+
+ // Converts the condition to conjunctive normal form (CNF)
+ val cnfCondition = RexUtil.toCnf(rexBuilder, condition)
+
+ // split the condition into time indicator condition and other condition
+ val (timeTerms, remainTerms) = cnfCondition match {
+ case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+ c.getOperands.asScala
+ .map(analyzeCondtionTermType(_, leftLogicalFieldCnt, inputSchema.logicalType))
+ .reduceLeft((l, r) => {
+ (l._1 ++ r._1, l._2 ++ r._2)
+ })
+ case _ =>
+ throw new TableException("A time-based stream join requires exactly " +
+ "two join predicates that bound the time in both directions.")
+ }
+
+ if (timeTerms.size != 2) {
+ throw new TableException("A time-based stream join requires exactly " +
+ "two join predicates that bound the time in both directions.")
+ }
+
+ // extract time offset from the time indicator conditon
+ val streamTimeOffsets =
+ timeTerms.map(x => extractTimeOffsetFromCondition(x._3, x._2, rexBuilder, config))
+
+ val (leftLowerBound, leftUpperBound) =
+ streamTimeOffsets match {
+ case Seq((x, true), (y, false)) => (x, y)
+ case Seq((x, false), (y, true)) => (y, x)
+ case _ =>
+ throw new TableException(
+ "Time-based join conditions must reference the time attribute of both input tables.")
+ }
+
+ // compose the remain condition list into one condition
+ val remainCondition =
+ remainTerms match {
+ case Seq() => None
+ case _ =>
+ // Converts logical field references to physical ones.
+ Some(remainTerms.map(inputSchema.mapRexNode).reduceLeft((l, r) => {
+ RelOptUtil.andJoinFilters(rexBuilder, l, r)
+ }))
+ }
+
+ val isRowTime: Boolean = timeTerms(0)._1 match {
+ case x if FlinkTypeFactory.isProctimeIndicatorType(x) => false
+ case _ => true
+ }
+ (isRowTime, leftLowerBound, leftUpperBound, remainCondition)
+ }
+
+ /**
+ * Split the join conditions into time condition and non-time condition
+ *
+ * @return (Seq(timeTerms), Seq(remainTerms)),
+ */
+ private def analyzeCondtionTermType(
+ conditionTerm: RexNode,
+ leftFieldCount: Int,
+ inputType: RelDataType): (Seq[(RelDataType, Boolean, RexNode)], Seq[RexNode]) = {
+
+ conditionTerm match {
+ case c: RexCall if Seq(SqlKind.GREATER_THAN, SqlKind.GREATER_THAN_OR_EQUAL,
+ SqlKind.LESS_THAN, SqlKind.LESS_THAN_OR_EQUAL).contains(c.getKind) =>
+ val timeIndicators = extractTimeIndicatorAccesses(c, leftFieldCount, inputType)
+ timeIndicators match {
+ case Seq() =>
+ (Seq(), Seq(c))
+ case Seq(v1, v2) =>
+ if (v1._1 != v2._1) {
+ throw new TableException(
+ "Both time attributes in a join condition must be of the same type.")
+ }
+ if (v1._2 == v2._2) {
+ throw new TableException("Time-based join conditions " +
+ "must reference the time attribute of both input tables.")
+ }
+ (Seq((v1._1, v1._2, c)), Seq())
+ case _ =>
+ throw new TableException(
+ "Time-based join conditions must reference the time attribute of both input tables.")
+ }
+ case other =>
+ val timeIndicators = extractTimeIndicatorAccesses(other, leftFieldCount, inputType)
+ timeIndicators match {
+ case Seq() =>
+ (Seq(), Seq(other))
+ case _ =>
+ throw new TableException("Time indicators can not be used in non time-condition.")
+ }
+ }
+ }
+
+ /**
+ * Extracts all time indicator attributes that are accessed in an expression.
+ *
+ * @return seq(timeType, is left input time indicator)
+ */
+ def extractTimeIndicatorAccesses(
+ expression: RexNode,
+ leftFieldCount: Int,
+ inputType: RelDataType): Seq[(RelDataType, Boolean)] = {
+
+ expression match {
+ case i: RexInputRef =>
+ val idx = i.getIndex
+ inputType.getFieldList.get(idx).getType match {
+ case t: TimeIndicatorRelDataType if idx < leftFieldCount =>
+ // left table time indicator
+ Seq((t, true))
+ case t: TimeIndicatorRelDataType =>
+ // right table time indicator
+ Seq((t, false))
+ case _ => Seq()
+ }
+ case c: RexCall =>
+ c.operands.asScala
+ .map(extractTimeIndicatorAccesses(_, leftFieldCount, inputType))
+ .reduce(_ ++ _)
+ case _ => Seq()
+ }
+ }
+
+ /**
+ * Computes the absolute bound on the left operand of a comparison expression and
+ * whether the bound is an upper or lower bound.
+ *
+ * @return window boundary, is left lower bound
+ */
+ def extractTimeOffsetFromCondition(
+ timeTerm: RexNode,
+ isLeftExprBelongLeftTable: Boolean,
+ rexBuilder: RexBuilder,
+ config: TableConfig): (Long, Boolean) = {
+
+ val timeCall: RexCall = timeTerm.asInstanceOf[RexCall]
+
+ val isLeftLowerBound: Boolean =
+ timeTerm.getKind match {
+ // e.g a.proctime > b.proctime - 5 sec, then it's the lower bound of a and the value is -5
+ // e.g b.proctime > a.proctime - 5 sec, then it's not the lower bound of a but upper bound
+ case kind@(SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL) =>
+ isLeftExprBelongLeftTable
+ // e.g a.proctime < b.proctime + 5 sec, the the upper bound of a is 5
+ case kind@(SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
+ !isLeftExprBelongLeftTable
+ case _ =>
+ throw new TableException("Unsupported time-condition.")
+ }
+
+ val (leftLiteral, rightLiteral) =
+ reduceTimeExpression(
+ timeCall.operands.get(0),
+ timeCall.operands.get(1),
+ rexBuilder,
+ config)
+ val tmpTimeOffset: Long =
+ if (isLeftExprBelongLeftTable) rightLiteral - leftLiteral else leftLiteral - rightLiteral
+
+ val boundary =
+ tmpTimeOffset.signum * (
+ if (timeTerm.getKind == SqlKind.LESS_THAN || timeTerm.getKind == SqlKind.GREATER_THAN) {
+ tmpTimeOffset.abs - 1
+ } else {
+ tmpTimeOffset.abs
+ })
+
+ (boundary, isLeftLowerBound)
+ }
+
+ /**
+ * Calculates the time boundary by replacing the time attribute by a zero literal
+ * and reducing the expression.
+ * For example:
+ * b.proctime - interval '1' second - interval '2' second will be translated to
+ * 0 - 1000 - 2000
+ */
+ private def reduceTimeExpression(
+ leftRexNode: RexNode,
+ rightRexNode: RexNode,
+ rexBuilder: RexBuilder,
+ config: TableConfig): (Long, Long) = {
+
+ /**
+ * replace the rowtime/proctime with zero literal.
+ */
+ def replaceTimeFieldWithLiteral(expr: RexNode): RexNode = {
+ expr match {
+ case c: RexCall =>
+ // replace in call operands
+ val newOps = c.operands.asScala.map(replaceTimeFieldWithLiteral(_)).asJava
+ rexBuilder.makeCall(c.getType, c.getOperator, newOps)
+ case i: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(i.getType) =>
+ // replace with timestamp
+ rexBuilder.makeZeroLiteral(expr.getType)
+ case _: RexInputRef =>
+ throw new TableException("Time join condition may only reference time indicator fields.")
+ case _ => expr
+ }
+ }
+
+ val literalLeftRex = replaceTimeFieldWithLiteral(leftRexNode)
+ val literalRightRex = replaceTimeFieldWithLiteral(rightRexNode)
+
+ val exprReducer = new ExpressionReducer(config)
+ val originList = new util.ArrayList[RexNode]()
+ originList.add(literalLeftRex)
+ originList.add(literalRightRex)
+ val reduceList = new util.ArrayList[RexNode]()
+ exprReducer.reduce(rexBuilder, originList, reduceList)
+
+ val literals = reduceList.asScala.map(f => f match {
+ case literal: RexLiteral =>
+ literal.getValue2.asInstanceOf[Long]
+ case _ =>
+ throw TableException(
+ "Time condition may only consist of time attributes, literals, and arithmetic operators.")
+ })
+
+ (literals(0), literals(1))
+ }
+
+
+ /**
+ * Generate other non-equi condition function
+ *
+ * @param config table env config
+ * @param joinType join type to determain whether input can be null
+ * @param leftType left stream type
+ * @param rightType right stream type
+ * @param returnType return type
+ * @param otherCondition non-equi condition
+ * @param ruleDescription rule description
+ */
+ private[flink] def generateJoinFunction(
+ config: TableConfig,
+ joinType: JoinRelType,
+ leftType: TypeInformation[Row],
+ rightType: TypeInformation[Row],
+ returnType: RowSchema,
+ otherCondition: Option[RexNode],
+ ruleDescription: String) = {
+
+ // whether input can be null
+ val nullCheck = joinType match {
+ case JoinRelType.INNER => false
+ case JoinRelType.LEFT => true
+ case JoinRelType.RIGHT => true
+ case JoinRelType.FULL => true
+ }
+
+ // generate other non-equi function code
+ val generator = new CodeGenerator(
+ config,
+ nullCheck,
+ leftType,
+ Some(rightType))
+
+ val conversion = generator.generateConverterResultExpression(
+ returnType.physicalTypeInfo,
+ returnType.physicalType.getFieldNames.asScala)
+
+ // if other condition is none, then output the result directly
+ val body = otherCondition match {
+ case None =>
+ s"""
+ |${conversion.code}
+ |${generator.collectorTerm}.collect(${conversion.resultTerm});
+ |""".stripMargin
+ case Some(remainCondition) =>
+ val genCond = generator.generateExpression(remainCondition)
+ s"""
+ |${genCond.code}
+ |if (${genCond.resultTerm}) {
+ | ${conversion.code}
+ | ${generator.collectorTerm}.collect(${conversion.resultTerm});
+ |}
+ |""".stripMargin
+ }
+
+ generator.generateFunction(
+ ruleDescription,
+ classOf[FlatJoinFunction[Row, Row, Row]],
+ body,
+ returnType.physicalTypeInfo)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
new file mode 100644
index 0000000..769ba55
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.updateutils
+
+import org.apache.calcite.rel.{RelNode, RelVisitor}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.datastream._
+import _root_.scala.collection.JavaConverters._
+
+object UpdateCheckUtils {
+
+ /** Validates that the plan produces only append changes. */
+ def isAppendOnly(plan: RelNode): Boolean = {
+ val appendOnlyValidator = new AppendOnlyValidator
+ appendOnlyValidator.go(plan)
+
+ appendOnlyValidator.isAppendOnly
+ }
+
+ /** Extracts the unique keys of the table produced by the plan. */
+ def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
+ val keyExtractor = new UniqueKeyExtractor
+ keyExtractor.go(plan)
+ keyExtractor.keys
+ }
+
+ private class AppendOnlyValidator extends RelVisitor {
+
+ var isAppendOnly = true
+
+ override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+ node match {
+ case s: DataStreamRel if s.producesUpdates =>
+ isAppendOnly = false
+ case _ =>
+ super.visit(node, ordinal, parent)
+ }
+ }
+ }
+
+ /** Identifies unique key fields in the output of a RelNode. */
+ private class UniqueKeyExtractor extends RelVisitor {
+
+ var keys: Option[Array[String]] = None
+
+ override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+ node match {
+ case c: DataStreamCalc =>
+ super.visit(node, ordinal, parent)
+ // check if input has keys
+ if (keys.isDefined) {
+ // track keys forward
+ val inNames = c.getInput.getRowType.getFieldNames
+ val inOutNames = c.getProgram.getNamedProjects.asScala
+ .map(p => {
+ c.getProgram.expandLocalRef(p.left) match {
+ // output field is forwarded input field
+ case i: RexInputRef => (i.getIndex, p.right)
+ // output field is renamed input field
+ case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+ a.getOperands.get(0) match {
+ case ref: RexInputRef =>
+ (ref.getIndex, p.right)
+ case _ =>
+ (-1, p.right)
+ }
+ // output field is not forwarded from input
+ case _: RexNode => (-1, p.right)
+ }
+ })
+ // filter all non-forwarded fields
+ .filter(_._1 >= 0)
+ // resolve names of input fields
+ .map(io => (inNames.get(io._1), io._2))
+
+ // filter by input keys
+ val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
+ // check if all keys have been preserved
+ if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
+ // all key have been preserved (but possibly renamed)
+ keys = Some(outKeys.toArray)
+ } else {
+ // some (or all) keys have been removed. Keys are no longer unique and removed
+ keys = None
+ }
+ }
+ case _: DataStreamOverAggregate =>
+ super.visit(node, ordinal, parent)
+ // keys are always forwarded by Over aggregate
+ case a: DataStreamGroupAggregate =>
+ // get grouping keys
+ val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
+ keys = Some(groupKeys.toArray)
+ case w: DataStreamGroupWindowAggregate =>
+ // get grouping keys
+ val groupKeys =
+ w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
+ // get window start and end time
+ val windowStartEnd = w.getWindowProperties.map(_.name)
+ // we have only a unique key if at least one window property is selected
+ if (windowStartEnd.nonEmpty) {
+ keys = Some(groupKeys ++ windowStartEnd)
+ }
+ case _: DataStreamRel =>
+ // anything else does not forward keys or might duplicate key, so we can stop
+ keys = None
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
new file mode 100644
index 0000000..d4ff3f7
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+ val data = List(
+ (1L, 1, "Hello"),
+ (2L, 2, "Hello"),
+ (3L, 3, "Hello"),
+ (4L, 4, "Hello"),
+ (5L, 5, "Hello"),
+ (6L, 6, "Hello"),
+ (7L, 7, "Hello World"),
+ (8L, 8, "Hello World"),
+ (20L, 20, "Hello World"))
+
+ /** test process time inner join **/
+ @Test
+ def testProcessTimeInnerJoin(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
+ "t1.proctime between t2.proctime - interval '5' second and t2.proctime + interval '5' second"
+
+ val data1 = new mutable.MutableList[(Int, Long, String)]
+ data1.+=((1, 1L, "Hi1"))
+ data1.+=((1, 2L, "Hi2"))
+ data1.+=((1, 5L, "Hi3"))
+ data1.+=((2, 7L, "Hi5"))
+ data1.+=((1, 9L, "Hi6"))
+ data1.+=((1, 8L, "Hi8"))
+
+ val data2 = new mutable.MutableList[(Int, Long, String)]
+ data2.+=((1, 1L, "HiHi"))
+ data2.+=((2, 2L, "HeHe"))
+
+ val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+ val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ }
+
+ /** test process time inner join with other condition **/
+ @Test
+ def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
+ "t1.proctime between t2.proctime - interval '5' second " +
+ "and t2.proctime + interval '5' second " +
+ "and t1.b > t2.b and t1.b + t2.b < 14"
+
+ val data1 = new mutable.MutableList[(String, Long, String)]
+ data1.+=(("1", 1L, "Hi1"))
+ data1.+=(("1", 2L, "Hi2"))
+ data1.+=(("1", 5L, "Hi3"))
+ data1.+=(("2", 7L, "Hi5"))
+ data1.+=(("1", 9L, "Hi6"))
+ data1.+=(("1", 8L, "Hi8"))
+
+ val data2 = new mutable.MutableList[(String, Long, String)]
+ data2.+=(("1", 5L, "HiHi"))
+ data2.+=(("2", 2L, "HeHe"))
+
+ val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+ val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
new file mode 100644
index 0000000..15e8b89
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.WindowJoinUtil
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Assert._
+import org.junit.Test
+
+class JoinTest extends TableTestBase {
+ private val streamUtil: StreamTableTestUtil = streamTestUtil()
+ streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+ streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+
+ @Test
+ def testProcessingTimeInnerJoin() = {
+
+ val sqlQuery = "SELECT t1.a, t2.b " +
+ "FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " +
+ "t1.proctime between t2.proctime - interval '1' hour and t2.proctime + interval '1' hour"
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "proctime")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "proctime")
+ ),
+ term("where",
+ "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+ "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+ term("join", "a, proctime, a0, b, proctime0"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ /** There should exist time conditions **/
+ @Test(expected = classOf[TableException])
+ def testWindowJoinUnExistTimeCondition() = {
+ val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a"
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ /** There should exist exactly two time conditions **/
+ @Test(expected = classOf[TableException])
+ def testWindowJoinSingleTimeCondition() = {
+ val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
+ " and t1.proctime > t2.proctime - interval '5' second"
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ /** Both time attributes in a join condition must be of the same type **/
+ @Test(expected = classOf[TableException])
+ def testWindowJoinDiffTimeIndicator() = {
+ val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
+ " and t1.proctime > t2.proctime - interval '5' second " +
+ " and t1.proctime < t2.c + interval '5' second"
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ /** The time conditions should be an And condition **/
+ @Test(expected = classOf[TableException])
+ def testWindowJoinNotCnfCondition() = {
+ val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
+ " and (t1.proctime > t2.proctime - interval '5' second " +
+ " or t1.proctime < t2.c + interval '5' second)"
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ @Test
+ def testJoinTimeBoundary(): Unit = {
+ verifyTimeBoundary(
+ "t1.proctime between t2.proctime - interval '1' hour " +
+ "and t2.proctime + interval '1' hour",
+ -3600000,
+ 3600000,
+ "proctime")
+
+ verifyTimeBoundary(
+ "t1.proctime > t2.proctime - interval '1' second and " +
+ "t1.proctime < t2.proctime + interval '1' second",
+ -999,
+ 999,
+ "proctime")
+
+ verifyTimeBoundary(
+ "t1.c >= t2.c - interval '1' second and " +
+ "t1.c <= t2.c + interval '1' second",
+ -1000,
+ 1000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c >= t2.c and " +
+ "t1.c <= t2.c + interval '1' second",
+ 0,
+ 1000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c >= t2.c + interval '1' second and " +
+ "t1.c <= t2.c + interval '10' second",
+ 1000,
+ 10000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t2.c - interval '1' second <= t1.c and " +
+ "t2.c + interval '10' second >= t1.c",
+ -1000,
+ 10000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c - interval '2' second >= t2.c + interval '1' second -" +
+ "interval '10' second and " +
+ "t1.c <= t2.c + interval '10' second",
+ -7000,
+ 10000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c >= t2.c - interval '10' second and " +
+ "t1.c <= t2.c - interval '5' second",
+ -10000,
+ -5000,
+ "rowtime")
+ }
+
+ @Test
+ def testJoinRemainConditionConvert(): Unit = {
+ streamUtil.addTable[(Int, Long, Int)]("MyTable3", 'a, 'b.rowtime, 'c, 'proctime.proctime)
+ streamUtil.addTable[(Int, Long, Int)]("MyTable4", 'a, 'b.rowtime, 'c, 'proctime.proctime)
+ val query =
+ "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
+ "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second and " +
+ "t1.c > t2.c"
+ verifyRemainConditionConvert(
+ query,
+ ">($1, $3)")
+
+ val query1 =
+ "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
+ "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second "
+ verifyRemainConditionConvert(
+ query1,
+ "")
+
+ streamUtil.addTable[(Int, Long, Int)]("MyTable5", 'a, 'b, 'c, 'proctime.proctime)
+ streamUtil.addTable[(Int, Long, Int)]("MyTable6", 'a, 'b, 'c, 'proctime.proctime)
+ val query2 =
+ "SELECT t1.a, t2.c FROM MyTable5 as t1 join MyTable6 as t2 on t1.a = t2.a and " +
+ "t1.proctime >= t2.proctime - interval '10' second " +
+ "and t1.proctime <= t2.proctime - interval '5' second and " +
+ "t1.c > t2.c"
+ verifyRemainConditionConvert(
+ query2,
+ ">($2, $5)")
+ }
+
+ def verifyTimeBoundary(
+ timeSql: String,
+ expLeftSize: Long,
+ expRightSize: Long,
+ expTimeType: String) = {
+ val query =
+ "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
+
+ val resultTable = streamUtil.tEnv.sql(query)
+ val relNode = resultTable.getRelNode
+ val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+ val rexNode = joinNode.getCondition
+ val (isRowTime, lowerBound, upperBound, conditionWithoutTime) =
+ WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType),
+ joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
+
+ val timeTypeStr =
+ if (isRowTime) "rowtime"
+ else "proctime"
+ assertEquals(expLeftSize, lowerBound)
+ assertEquals(expRightSize, upperBound)
+ assertEquals(expTimeType, timeTypeStr)
+ }
+
+ def verifyRemainConditionConvert(
+ query: String,
+ expectCondStr: String) = {
+
+ val resultTable = streamUtil.tEnv.sql(query)
+ val relNode = resultTable.getRelNode
+ val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+ val joinInfo = joinNode.analyzeCondition
+ val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
+ val (isRowTime, lowerBound, upperBound, remainCondition) =
+ WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType),
+ joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
+
+ val actual: String = remainCondition.getOrElse("").toString
+
+ assertEquals(expectCondStr, actual)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
new file mode 100644
index 0000000..c008ed3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.lang.{Integer => JInt}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, TwoInputStreamOperatorTestHarness}
+import org.apache.flink.table.codegen.GeneratedFunction
+import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, TupleRowKeySelector}
+import org.apache.flink.table.runtime.join.ProcTimeWindowInnerJoin
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.junit.Test
+
+
+class JoinHarnessTest extends HarnessTestBase{
+
+ private val rT = new RowTypeInfo(Array[TypeInformation[_]](
+ INT_TYPE_INFO,
+ STRING_TYPE_INFO),
+ Array("a", "b"))
+
+
+ val funcCode: String =
+ """
+ |public class TestJoinFunction
+ | extends org.apache.flink.api.common.functions.RichFlatJoinFunction {
+ | transient org.apache.flink.types.Row out =
+ | new org.apache.flink.types.Row(4);
+ | public TestJoinFunction() throws Exception {}
+ |
+ | @Override
+ | public void open(org.apache.flink.configuration.Configuration parameters)
+ | throws Exception {}
+ |
+ | @Override
+ | public void join(Object _in1, Object _in2, org.apache.flink.util.Collector c)
+ | throws Exception {
+ | org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1;
+ | org.apache.flink.types.Row in2 = (org.apache.flink.types.Row) _in2;
+ |
+ | out.setField(0, in1.getField(0));
+ | out.setField(1, in1.getField(1));
+ | out.setField(2, in2.getField(0));
+ | out.setField(3, in2.getField(1));
+ |
+ | c.collect(out);
+ |
+ | }
+ |
+ | @Override
+ | public void close() throws Exception {}
+ |}
+ """.stripMargin
+
+ /** a.proctime >= b.proctime - 10 and a.proctime <= b.proctime + 20 **/
+ @Test
+ def testNormalProcTimeJoin() {
+
+ val joinProcessFunc = new ProcTimeWindowInnerJoin(-10, 20, rT, rT, "TestJoinFunction", funcCode)
+
+ val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] =
+ new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc)
+ val testHarness: KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow] =
+ new KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow](
+ operator,
+ new TupleRowKeySelector[Integer](0),
+ new TupleRowKeySelector[Integer](0),
+ BasicTypeInfo.INT_TYPE_INFO,
+ 1,1,0)
+
+ testHarness.open()
+
+ // left stream input
+ testHarness.setProcessingTime(1)
+ testHarness.processElement1(new StreamRecord(
+ CRow(Row.of(1: JInt, "aaa"), true), 1))
+ assert(testHarness.numProcessingTimeTimers() == 1)
+ testHarness.setProcessingTime(2)
+ testHarness.processElement1(new StreamRecord(
+ CRow(Row.of(2: JInt, "bbb"), true), 2))
+ assert(testHarness.numProcessingTimeTimers() == 2)
+ testHarness.setProcessingTime(3)
+ testHarness.processElement1(new StreamRecord(
+ CRow(Row.of(1: JInt, "aaa2"), true), 3))
+ assert(testHarness.numKeyedStateEntries() == 4)
+ assert(testHarness.numProcessingTimeTimers() == 2)
+
+ // right stream input and output normally
+ testHarness.processElement2(new StreamRecord(
+ CRow(Row.of(1: JInt, "Hi1"), true), 3))
+ testHarness.setProcessingTime(4)
+ testHarness.processElement2(new StreamRecord(
+ CRow(Row.of(2: JInt, "Hello1"), true), 4))
+ assert(testHarness.numKeyedStateEntries() == 8)
+ assert(testHarness.numProcessingTimeTimers() == 4)
+
+ // expired left stream record at timestamp 1
+ testHarness.setProcessingTime(12)
+ assert(testHarness.numKeyedStateEntries() == 8)
+ assert(testHarness.numProcessingTimeTimers() == 4)
+ testHarness.processElement2(new StreamRecord(
+ CRow(Row.of(1: JInt, "Hi2"), true), 12))
+
+ // expired right stream record at timestamp 4 and all left stream
+ testHarness.setProcessingTime(25)
+ assert(testHarness.numKeyedStateEntries() == 2)
+ assert(testHarness.numProcessingTimeTimers() == 1)
+ testHarness.processElement1(new StreamRecord(
+ CRow(Row.of(1: JInt, "aaa3"), true), 25))
+ testHarness.processElement1(new StreamRecord(
+ CRow(Row.of(2: JInt, "bbb2"), true), 25))
+ testHarness.processElement2(new StreamRecord(
+ CRow(Row.of(2: JInt, "Hello2"), true), 25))
+
+ testHarness.setProcessingTime(45)
+ assert(testHarness.numKeyedStateEntries() > 0)
+ testHarness.setProcessingTime(46)
+ assert(testHarness.numKeyedStateEntries() == 0)
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), true), 3))
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(1: JInt, "aaa2", 1: JInt, "Hi1"), true), 3))
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(2: JInt, "bbb", 2: JInt, "Hello1"), true), 4))
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(1: JInt, "aaa2", 1: JInt, "Hi2"), true), 12))
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(1: JInt, "aaa3", 1: JInt, "Hi2"), true), 25))
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(2: JInt, "bbb2", 2: JInt, "Hello2"), true), 25))
+
+ verify(expectedOutput, result, new RowResultSortComparator(6))
+
+ testHarness.close()
+ }
+
+ /** a.proctime >= b.proctime - 10 and a.proctime <= b.proctime - 5 **/
+ @Test
+ def testProcTimeJoinSingleNeedStore() {
+
+ val joinProcessFunc = new ProcTimeWindowInnerJoin(-10, -5, rT, rT, "TestJoinFunction", funcCode)
+
+ val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] =
+ new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc)
+ val testHarness: KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow] =
+ new KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow](
+ operator,
+ new TupleRowKeySelector[Integer](0),
+ new TupleRowKeySelector[Integer](0),
+ BasicTypeInfo.INT_TYPE_INFO,
+ 1,1,0)
+
+ testHarness.open()
+
+ testHarness.setProcessingTime(1)
+ testHarness.processElement1(new StreamRecord(
+ CRow(Row.of(1: JInt, "aaa1"), true), 1))
+ testHarness.setProcessingTime(2)
+ testHarness.processElement1(new StreamRecord(
+ CRow(Row.of(2: JInt, "aaa2"), true), 2))
+ testHarness.setProcessingTime(3)
+ testHarness.processElement1(new StreamRecord(
+ CRow(Row.of(1: JInt, "aaa3"), true), 3))
+ assert(testHarness.numKeyedStateEntries() == 4)
+ assert(testHarness.numProcessingTimeTimers() == 2)
+
+ // Do not store b elements
+ // not meet a.proctime <= b.proctime - 5
+ testHarness.processElement2(new StreamRecord(
+ CRow(Row.of(1: JInt, "bbb3"), true), 3))
+ assert(testHarness.numKeyedStateEntries() == 4)
+ assert(testHarness.numProcessingTimeTimers() == 2)
+
+ // meet a.proctime <= b.proctime - 5
+ testHarness.setProcessingTime(7)
+ testHarness.processElement2(new StreamRecord(
+ CRow(Row.of(2: JInt, "bbb7"), true), 7))
+ assert(testHarness.numKeyedStateEntries() == 4)
+ assert(testHarness.numProcessingTimeTimers() == 2)
+
+ // expire record of stream a at timestamp 1
+ testHarness.setProcessingTime(12)
+ assert(testHarness.numKeyedStateEntries() == 4)
+ assert(testHarness.numProcessingTimeTimers() == 2)
+ testHarness.processElement2(new StreamRecord(
+ CRow(Row.of(1: JInt, "bbb12"), true), 12))
+
+ testHarness.setProcessingTime(13)
+ assert(testHarness.numKeyedStateEntries() == 2)
+ assert(testHarness.numProcessingTimeTimers() == 1)
+
+ testHarness.setProcessingTime(14)
+ assert(testHarness.numKeyedStateEntries() == 0)
+ assert(testHarness.numProcessingTimeTimers() == 0)
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7))
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12))
+
+ verify(expectedOutput, result, new RowResultSortComparator(6))
+
+ testHarness.close()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 10c79d0..b0500ca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
@@ -123,4 +124,12 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
super.initializeState(operatorStateHandles);
}
+
+ public int numKeyedStateEntries() {
+ if (keyedStateBackend instanceof HeapKeyedStateBackend) {
+ return ((HeapKeyedStateBackend) keyedStateBackend).numStateEntries();
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ }
}
[3/3] flink git commit: [FLINK-6232] [table] Add SQL documentation
for time window join.
Posted by fh...@apache.org.
[FLINK-6232] [table] Add SQL documentation for time window join.
- Add support for window join predicates in WHERE clause.
- Refactoring of WindowJoinUtil.
- Minor refactorings of join classes.
This closes #4324.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/471345c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/471345c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/471345c0
Branch: refs/heads/master
Commit: 471345c0ea9930a582786cc08bd290d374b28c5a
Parents: ba6c59e
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Jul 13 00:49:30 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Jul 18 00:57:37 2017 +0200
----------------------------------------------------------------------
docs/dev/table/sql.md | 29 ++
.../table/api/StreamTableEnvironment.scala | 10 +-
.../calcite/RelTimeIndicatorConverter.scala | 166 ++++----
.../flink/table/plan/nodes/CommonCalc.scala | 25 +-
.../table/plan/nodes/dataset/DataSetCalc.scala | 12 +-
.../table/plan/nodes/dataset/DataSetJoin.scala | 2 +-
.../plan/nodes/datastream/DataStreamCalc.scala | 25 +-
.../nodes/datastream/DataStreamWindowJoin.scala | 40 +-
.../datastream/DataStreamWindowJoinRule.scala | 55 ++-
.../table/plan/util/UpdatingPlanChecker.scala | 129 ++++++
.../runtime/join/ProcTimeWindowInnerJoin.scala | 134 +++---
.../table/runtime/join/WindowJoinUtil.scala | 408 ++++++++++++-------
.../table/updateutils/UpdateCheckUtils.scala | 128 ------
.../table/api/scala/stream/sql/JoinITCase.scala | 117 ------
.../table/api/scala/stream/sql/JoinTest.scala | 235 -----------
.../flink/table/api/stream/sql/JoinTest.scala | 250 ++++++++++++
.../sql/validation/JoinValidationTest.scala | 95 +++++
.../plan/TimeIndicatorConversionTest.scala | 2 +-
.../table/runtime/harness/JoinHarnessTest.scala | 61 ++-
.../table/runtime/stream/sql/JoinITCase.scala | 106 +++++
20 files changed, 1167 insertions(+), 862 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 81aa3d9..e677b60 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -369,6 +369,35 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
</td>
</tr>
<tr>
+ <td><strong>Time-windowed Join</strong><br>
+ <span class="label label-primary">Batch</span>
+ <span class="label label-primary">Streaming</span>
+ </td>
+ <td>
+ <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
+
+ <p>A time-windowed join requires a special join condition that bounds the time on both sides. This can be done by either two appropriate range predicates (<code> <, <=, >=, ></code>) or a <code>BETWEEN</code> predicate that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+ <ul>
+ <li>Time predicates must compare time attributes of both input tables.</li>
+ <li>Time predicates must compare only time attributes of the same type, i.e., processing time with processing time or event time with event time.</li>
+ <li>Only range predicates are valid time predicates.</li>
+ <li>Non-time predicates must not access a time attribute.</li>
+ </ul>
+ </p>
+
+ <p><b>Note:</b> Currently, only processing time window joins and <code>INNER</code> joins are supported.</p>
+
+{% highlight sql %}
+SELECT *
+FROM Orders o, Shipments s
+WHERE o.id = s.orderId AND
+ o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
+{% endhighlight %}
+
+The example above will join all orders with their corresponding shipments if the order was shipped four hours after the order was received.
+ </td>
+ </tr>
+ <tr>
<td>
<strong>Expanding arrays into a relation</strong><br>
<span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index f026824..669b017 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -45,12 +45,12 @@ import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait, _}
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable}
+import org.apache.flink.table.plan.util.UpdatingPlanChecker
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner}
import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.TypeCheckUtils
-import org.apache.flink.table.updateutils.UpdateCheckUtils
import org.apache.flink.types.Row
import _root_.scala.collection.JavaConverters._
@@ -174,10 +174,10 @@ abstract class StreamTableEnvironment(
// optimize plan
val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
// check for append only table
- val isAppendOnlyTable = UpdateCheckUtils.isAppendOnly(optimizedPlan)
+ val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(optimizedPlan)
upsertSink.setIsAppendOnly(isAppendOnlyTable)
// extract unique key fields
- val tableKeys: Option[Array[String]] = UpdateCheckUtils.getUniqueKeyFields(optimizedPlan)
+ val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan)
// check that we have keys if the table has changes (is not append-only)
tableKeys match {
case Some(keys) => upsertSink.setKeyFields(keys)
@@ -201,7 +201,7 @@ abstract class StreamTableEnvironment(
// optimize plan
val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
// verify table is an insert-only (append-only) table
- if (!UpdateCheckUtils.isAppendOnly(optimizedPlan)) {
+ if (!UpdatingPlanChecker.isAppendOnly(optimizedPlan)) {
throw new TableException(
"AppendStreamTableSink requires that Table has only insert changes.")
}
@@ -651,7 +651,7 @@ abstract class StreamTableEnvironment(
(implicit tpe: TypeInformation[A]): DataStream[A] = {
// if no change flags are requested, verify table is an insert-only (append-only) table.
- if (!withChangeFlag && !UpdateCheckUtils.isAppendOnly(logicalPlan)) {
+ if (!withChangeFlag && !UpdatingPlanChecker.isAppendOnly(logicalPlan)) {
throw new TableException(
"Table is not an append-only table. " +
"Use the toRetractStream() in order to handle add and retract messages.")
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 32d6f01..d76613e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -42,9 +42,9 @@ import scala.collection.mutable
class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
private val timestamp = rexBuilder
- .getTypeFactory
- .asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ .getTypeFactory
+ .asInstanceOf[FlinkTypeFactory]
+ .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
override def visit(intersect: LogicalIntersect): RelNode =
throw new TableException("Logical intersect in a stream environment is not supported yet.")
@@ -138,15 +138,10 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
// visit children and update inputs
val input = filter.getInput.accept(this)
- // check if input field contains time indicator type
- // materialize field if no time indicator is present anymore
- // if input field is already materialized, change to timestamp type
- val materializer = new RexTimeIndicatorMaterializer(
- rexBuilder,
- input.getRowType.getFieldList.map(_.getType))
-
- val condition = filter.getCondition.accept(materializer)
- LogicalFilter.create(input, condition)
+ // We do not materialize time indicators in conditions because they can be locally evaluated.
+ // Some conditions are evaluated by special operators (e.g., time window joins).
+ // Time indicators in remaining conditions are materialized by Calc before the code generation.
+ LogicalFilter.create(input, filter.getCondition)
}
override def visit(project: LogicalProject): RelNode = {
@@ -306,68 +301,6 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
updatedAggCalls)
}
- class RexTimeIndicatorMaterializer(
- private val rexBuilder: RexBuilder,
- private val input: Seq[RelDataType])
- extends RexShuttle {
-
- override def visitInputRef(inputRef: RexInputRef): RexNode = {
- // reference is interesting
- if (isTimeIndicatorType(inputRef.getType)) {
- val resolvedRefType = input(inputRef.getIndex)
- // input is a valid time indicator
- if (isTimeIndicatorType(resolvedRefType)) {
- inputRef
- }
- // input has been materialized
- else {
- new RexInputRef(inputRef.getIndex, resolvedRefType)
- }
- }
- // reference is a regular field
- else {
- super.visitInputRef(inputRef)
- }
- }
-
- override def visitCall(call: RexCall): RexNode = {
- val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
-
- // materialize operands with time indicators
- val materializedOperands = updatedCall.getOperator match {
-
- // skip materialization for special operators
- case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
- updatedCall.getOperands.toList
-
- case _ =>
- updatedCall.getOperands.map { o =>
- if (isTimeIndicatorType(o.getType)) {
- rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
- } else {
- o
- }
- }
- }
-
- // remove time indicator return type
- updatedCall.getOperator match {
-
- // we do not modify AS if operand has not been materialized
- case SqlStdOperatorTable.AS if
- isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
- updatedCall
-
- // materialize function's result and operands
- case _ if isTimeIndicatorType(updatedCall.getType) =>
- updatedCall.clone(timestamp, materializedOperands)
-
- // materialize function's operands only
- case _ =>
- updatedCall.clone(updatedCall.getType, materializedOperands)
- }
- }
- }
}
object RelTimeIndicatorConverter {
@@ -421,4 +354,89 @@ object RelTimeIndicatorConverter {
new RelRecordType(fields)
}
+
+ /**
+ * Materializes time indicator accesses in an expression.
+ *
+ * @param expr The expression in which time indicators are materialized.
+ * @param rowType The input schema of the expression.
+ * @param rexBuilder A RexBuilder.
+ *
+ * @return The expression with materialized time indicators.
+ */
+ def convertExpression(expr: RexNode, rowType: RelDataType, rexBuilder: RexBuilder): RexNode = {
+ val materializer = new RexTimeIndicatorMaterializer(
+ rexBuilder,
+ rowType.getFieldList.map(_.getType))
+
+ expr.accept(materializer)
+ }
+}
+
+class RexTimeIndicatorMaterializer(
+ private val rexBuilder: RexBuilder,
+ private val input: Seq[RelDataType])
+ extends RexShuttle {
+
+ private val timestamp = rexBuilder
+ .getTypeFactory
+ .asInstanceOf[FlinkTypeFactory]
+ .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ // reference is interesting
+ if (isTimeIndicatorType(inputRef.getType)) {
+ val resolvedRefType = input(inputRef.getIndex)
+ // input is a valid time indicator
+ if (isTimeIndicatorType(resolvedRefType)) {
+ inputRef
+ }
+ // input has been materialized
+ else {
+ new RexInputRef(inputRef.getIndex, resolvedRefType)
+ }
+ }
+ // reference is a regular field
+ else {
+ super.visitInputRef(inputRef)
+ }
+ }
+
+ override def visitCall(call: RexCall): RexNode = {
+ val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
+
+ // materialize operands with time indicators
+ val materializedOperands = updatedCall.getOperator match {
+
+ // skip materialization for special operators
+ case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
+ updatedCall.getOperands.toList
+
+ case _ =>
+ updatedCall.getOperands.map { o =>
+ if (isTimeIndicatorType(o.getType)) {
+ rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
+ } else {
+ o
+ }
+ }
+ }
+
+ // remove time indicator return type
+ updatedCall.getOperator match {
+
+ // we do not modify AS if operand has not been materialized
+ case SqlStdOperatorTable.AS if
+ isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
+ updatedCall
+
+ // materialize function's result and operands
+ case _ if isTimeIndicatorType(updatedCall.getType) =>
+ updatedCall.clone(timestamp, materializedOperands)
+
+ // materialize function's operands only
+ case _ =>
+ updatedCall.clone(updatedCall.getType, materializedOperands)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 693924e..3e355ff 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -22,12 +22,10 @@ import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
import org.apache.calcite.rex._
import org.apache.flink.api.common.functions.Function
import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.types.Row
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
trait CommonCalc {
@@ -37,39 +35,26 @@ trait CommonCalc {
ruleDescription: String,
inputSchema: RowSchema,
returnSchema: RowSchema,
- calcProgram: RexProgram,
+ calcProjection: Seq[RexNode],
+ calcCondition: Option[RexNode],
config: TableConfig,
functionClass: Class[T]):
GeneratedFunction[T, Row] = {
- val expandedExpressions = calcProgram
- .getProjectList
- .map(expr => calcProgram.expandLocalRef(expr))
- // time indicator fields must not be part of the code generation
- .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType))
- // update indices
- .map(expr => inputSchema.mapRexNode(expr))
-
- val condition = if (calcProgram.getCondition != null) {
- inputSchema.mapRexNode(calcProgram.expandLocalRef(calcProgram.getCondition))
- } else {
- null
- }
-
val projection = generator.generateResultExpression(
returnSchema.physicalTypeInfo,
returnSchema.physicalFieldNames,
- expandedExpressions)
+ calcProjection)
// only projection
- val body = if (condition == null) {
+ val body = if (calcCondition.isEmpty) {
s"""
|${projection.code}
|${generator.collectorTerm}.collect(${projection.resultTerm});
|""".stripMargin
}
else {
- val filterCondition = generator.generateExpression(condition)
+ val filterCondition = generator.generateExpression(calcCondition.get)
// only filter
if (projection == null) {
s"""
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index a923acc..7417eb2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -35,6 +35,8 @@ import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.FlatMapRunner
import org.apache.flink.types.Row
+import scala.collection.JavaConverters._
+
/**
* Flink RelNode which matches along with LogicalCalc.
*
@@ -90,12 +92,20 @@ class DataSetCalc(
val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+ val projection = calcProgram.getProjectList.asScala.map(calcProgram.expandLocalRef)
+ val condition = if (calcProgram.getCondition != null) {
+ Some(calcProgram.expandLocalRef(calcProgram.getCondition))
+ } else {
+ None
+ }
+
val genFunction = generateFunction(
generator,
ruleDescription,
new RowSchema(getInput.getRowType),
new RowSchema(getRowType),
- calcProgram,
+ projection,
+ condition,
config,
classOf[FlatMapFunction[Row, Row]])
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index e8f3b82..1583e31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -58,7 +58,7 @@ class DataSetJoin(
with CommonJoin
with DataSetRel {
- override def deriveRowType() = rowRelDataType
+ override def deriveRowType(): RelDataType = rowRelDataType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetJoin(
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index d626c46..2e00330 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -28,11 +28,14 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
import org.apache.flink.table.codegen.FunctionCodeGenerator
+import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
import org.apache.flink.table.plan.nodes.CommonCalc
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.CRowProcessRunner
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import scala.collection.JavaConverters._
+
/**
* Flink RelNode which matches along with FlatMapOperator.
*
@@ -93,6 +96,25 @@ class DataStreamCalc(
val inputDataStream =
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+ // materialize time attributes in condition
+ val condition = if (calcProgram.getCondition != null) {
+ val materializedCondition = RelTimeIndicatorConverter.convertExpression(
+ calcProgram.expandLocalRef(calcProgram.getCondition),
+ inputSchema.logicalType,
+ cluster.getRexBuilder)
+ Some(materializedCondition)
+ } else {
+ None
+ }
+
+ // filter out time attributes
+ val projection = calcProgram.getProjectList.asScala
+ .map(calcProgram.expandLocalRef)
+ // time indicator fields must not be part of the code generation
+ .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType))
+ // update indices
+ .map(expr => inputSchema.mapRexNode(expr))
+
val generator = new FunctionCodeGenerator(config, false, inputSchema.physicalTypeInfo)
val genFunction = generateFunction(
@@ -100,7 +122,8 @@ class DataStreamCalc(
ruleDescription,
inputSchema,
schema,
- calcProgram,
+ projection,
+ condition,
config,
classOf[ProcessFunction[CRow, CRow]])
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
index 1315a79..987947c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
import org.apache.calcite.rex.RexNode
@@ -27,12 +28,12 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
import org.apache.flink.table.plan.nodes.CommonJoin
import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.UpdatingPlanChecker
import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, WindowJoinUtil}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.table.updateutils.UpdateCheckUtils
/**
- * Flink RelNode which matches along with JoinOperator and its related operations.
+ * RelNode for a time windowed stream join.
*/
class DataStreamWindowJoin(
cluster: RelOptCluster,
@@ -53,7 +54,7 @@ class DataStreamWindowJoin(
with CommonJoin
with DataStreamRel {
- override def deriveRowType() = schema.logicalType
+ override def deriveRowType(): RelDataType = schema.logicalType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamWindowJoin(
@@ -96,8 +97,8 @@ class DataStreamWindowJoin(
val config = tableEnv.getConfig
- val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
- val isRightAppendOnly = UpdateCheckUtils.isAppendOnly(right)
+ val isLeftAppendOnly = UpdatingPlanChecker.isAppendOnly(left)
+ val isRightAppendOnly = UpdatingPlanChecker.isAppendOnly(right)
if (!isLeftAppendOnly || !isRightAppendOnly) {
throw new TableException(
"Windowed stream join does not support updates.")
@@ -124,21 +125,20 @@ class DataStreamWindowJoin(
joinType match {
case JoinRelType.INNER =>
- isRowTime match {
- case false =>
- // Proctime JoinCoProcessFunction
- createProcTimeInnerJoinFunction(
- leftDataStream,
- rightDataStream,
- joinFunction.name,
- joinFunction.code,
- leftKeys,
- rightKeys
- )
- case true =>
- // RowTime JoinCoProcessFunction
- throw new TableException(
- "RowTime inner join between stream and stream is not supported yet.")
+ if (isRowTime) {
+ // RowTime JoinCoProcessFunction
+ throw new TableException(
+ "RowTime inner join between stream and stream is not supported yet.")
+ } else {
+ // Proctime JoinCoProcessFunction
+ createProcTimeInnerJoinFunction(
+ leftDataStream,
+ rightDataStream,
+ joinFunction.name,
+ joinFunction.code,
+ leftKeys,
+ rightKeys
+ )
}
case JoinRelType.FULL =>
throw new TableException(
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
index c7a190f..2075689 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
@@ -21,13 +21,16 @@ package org.apache.flink.table.plan.rules.datastream
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamWindowJoin
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.join.WindowJoinUtil
+import scala.collection.JavaConverters._
+
class DataStreamWindowJoinRule
extends ConverterRule(
classOf[FlinkLogicalJoin],
@@ -39,18 +42,36 @@ class DataStreamWindowJoinRule
val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
val joinInfo = join.analyzeCondition
- try {
- WindowJoinUtil.analyzeTimeBoundary(
- joinInfo.getRemaining(join.getCluster.getRexBuilder),
- join.getLeft.getRowType.getFieldCount,
- new RowSchema(join.getRowType),
- join.getCluster.getRexBuilder,
- TableConfig.DEFAULT)
- true
- } catch {
- case _: TableException =>
+ val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate(
+ joinInfo.getRemaining(join.getCluster.getRexBuilder),
+ join.getLeft.getRowType.getFieldCount,
+ join.getRowType,
+ join.getCluster.getRexBuilder,
+ TableConfig.DEFAULT)
+
+ // remaining predicate must not access time attributes
+ val remainingPredsAccessTime = remainingPreds.isDefined &&
+ WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, join.getRowType)
+
+ if (windowBounds.isDefined) {
+ if (windowBounds.get.isEventTime) {
+ // we cannot handle event-time window joins yet
false
+ } else {
+ // Check that no event-time attributes are in the input.
+ // The proc-time join implementation does ensure that record timestamp are correctly set.
+ // It is always the timestamp of the later arriving record.
+ // We rely on projection pushdown to remove unused attributes before the join.
+ val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
+ .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+ !remainingPredsAccessTime && !rowTimeAttrInOutput
+ }
+ } else {
+ // the given join does not have valid window bounds. We cannot translate it.
+ false
}
+
}
override def convert(rel: RelNode): RelNode = {
@@ -63,11 +84,11 @@ class DataStreamWindowJoinRule
val leftRowSchema = new RowSchema(convLeft.getRowType)
val rightRowSchema = new RowSchema(convRight.getRowType)
- val (isRowTime, leftLowerBoundary, leftUpperBoundary, remainCondition) =
- WindowJoinUtil.analyzeTimeBoundary(
+ val (windowBounds, remainCondition) =
+ WindowJoinUtil.extractWindowBoundsFromPredicate(
joinInfo.getRemaining(join.getCluster.getRexBuilder),
leftRowSchema.logicalArity,
- new RowSchema(join.getRowType),
+ join.getRowType,
join.getCluster.getRexBuilder,
TableConfig.DEFAULT)
@@ -81,9 +102,9 @@ class DataStreamWindowJoinRule
leftRowSchema,
rightRowSchema,
new RowSchema(rel.getRowType),
- isRowTime,
- leftLowerBoundary,
- leftUpperBoundary,
+ windowBounds.get.isEventTime,
+ windowBounds.get.leftLowerBound,
+ windowBounds.get.leftUpperBound,
remainCondition,
description)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
new file mode 100644
index 0000000..6a160f6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rel.{RelNode, RelVisitor}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.datastream._
+
+import _root_.scala.collection.JavaConverters._
+
+object UpdatingPlanChecker {
+
+ /** Validates that the plan produces only append changes. */
+ def isAppendOnly(plan: RelNode): Boolean = {
+ val appendOnlyValidator = new AppendOnlyValidator
+ appendOnlyValidator.go(plan)
+
+ appendOnlyValidator.isAppendOnly
+ }
+
+ /** Extracts the unique keys of the table produced by the plan. */
+ def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
+ val keyExtractor = new UniqueKeyExtractor
+ keyExtractor.go(plan)
+ keyExtractor.keys
+ }
+
+ private class AppendOnlyValidator extends RelVisitor {
+
+ var isAppendOnly = true
+
+ override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+ node match {
+ case s: DataStreamRel if s.producesUpdates =>
+ isAppendOnly = false
+ case _ =>
+ super.visit(node, ordinal, parent)
+ }
+ }
+ }
+
+ /** Identifies unique key fields in the output of a RelNode. */
+ private class UniqueKeyExtractor extends RelVisitor {
+
+ var keys: Option[Array[String]] = None
+
+ override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+ node match {
+ case c: DataStreamCalc =>
+ super.visit(node, ordinal, parent)
+ // check if input has keys
+ if (keys.isDefined) {
+ // track keys forward
+ val inNames = c.getInput.getRowType.getFieldNames
+ val inOutNames = c.getProgram.getNamedProjects.asScala
+ .map(p => {
+ c.getProgram.expandLocalRef(p.left) match {
+ // output field is forwarded input field
+ case i: RexInputRef => (i.getIndex, p.right)
+ // output field is renamed input field
+ case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+ a.getOperands.get(0) match {
+ case ref: RexInputRef =>
+ (ref.getIndex, p.right)
+ case _ =>
+ (-1, p.right)
+ }
+ // output field is not forwarded from input
+ case _: RexNode => (-1, p.right)
+ }
+ })
+ // filter all non-forwarded fields
+ .filter(_._1 >= 0)
+ // resolve names of input fields
+ .map(io => (inNames.get(io._1), io._2))
+
+ // filter by input keys
+ val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
+ // check if all keys have been preserved
+ if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
+ // all key have been preserved (but possibly renamed)
+ keys = Some(outKeys.toArray)
+ } else {
+ // some (or all) keys have been removed. Keys are no longer unique and removed
+ keys = None
+ }
+ }
+ case _: DataStreamOverAggregate =>
+ super.visit(node, ordinal, parent)
+ // keys are always forwarded by Over aggregate
+ case a: DataStreamGroupAggregate =>
+ // get grouping keys
+ val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
+ keys = Some(groupKeys.toArray)
+ case w: DataStreamGroupWindowAggregate =>
+ // get grouping keys
+ val groupKeys =
+ w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
+ // get window start and end time
+ val windowStartEnd = w.getWindowProperties.map(_.name)
+ // we have only a unique key if at least one window property is selected
+ if (windowStartEnd.nonEmpty) {
+ keys = Some(groupKeys ++ windowStartEnd)
+ }
+ case _: DataStreamRel =>
+ // anything else does not forward keys or might duplicate key, so we can stop
+ keys = None
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
index 97d5ccc..8a3ba43 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
@@ -59,24 +59,26 @@ class ProcTimeWindowInnerJoin(
private var cRowWrapper: CRowWrappingCollector = _
- /** other condition function **/
+ // other condition function
private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
- /** tmp list to store expired records **/
- private var listToRemove: JList[Long] = _
+ // tmp list to store expired records
+ private var removeList: JList[Long] = _
- /** state to hold left stream element **/
+ // state to hold left stream element
private var row1MapState: MapState[Long, JList[Row]] = _
- /** state to hold right stream element **/
+ // state to hold right stream element
private var row2MapState: MapState[Long, JList[Row]] = _
- /** state to record last timer of left stream, 0 means no timer **/
+ // state to record last timer of left stream, 0 means no timer
private var timerState1: ValueState[Long] = _
- /** state to record last timer of right stream, 0 means no timer **/
+ // state to record last timer of right stream, 0 means no timer
private var timerState2: ValueState[Long] = _
- private val leftStreamWinSize: Long = if (leftLowerBound < 0) -leftLowerBound else 0
- private val rightStreamWinSize: Long = if (leftUpperBound > 0) leftUpperBound else 0
+ // compute window sizes, i.e., how long to keep rows in state.
+ // window size of -1 means rows do not need to be put into state.
+ private val leftStreamWinSize: Long = if (leftLowerBound <= 0) -leftLowerBound else -1
+ private val rightStreamWinSize: Long = if (leftUpperBound >= 0) leftUpperBound else -1
val LOG = LoggerFactory.getLogger(this.getClass)
@@ -90,7 +92,7 @@ class ProcTimeWindowInnerJoin(
LOG.debug("Instantiating JoinFunction.")
joinFunction = clazz.newInstance()
- listToRemove = new util.ArrayList[Long]()
+ removeList = new util.ArrayList[Long]()
cRowWrapper = new CRowWrappingCollector()
cRowWrapper.setChange(true)
@@ -140,7 +142,7 @@ class ProcTimeWindowInnerJoin(
row2MapState,
-leftUpperBound, // right stream lower
-leftLowerBound, // right stream upper
- true
+ isLeft = true
)
}
@@ -165,9 +167,9 @@ class ProcTimeWindowInnerJoin(
timerState2,
row2MapState,
row1MapState,
- leftLowerBound, // left stream upper
+ leftLowerBound, // left stream lower
leftUpperBound, // left stream upper
- false
+ isLeft = false
)
}
@@ -217,66 +219,82 @@ class ProcTimeWindowInnerJoin(
winSize: Long,
timerState: ValueState[Long],
rowMapState: MapState[Long, JList[Row]],
- oppoRowMapState: MapState[Long, JList[Row]],
- oppoLowerBound: Long,
- oppoUpperBound: Long,
+ otherRowMapState: MapState[Long, JList[Row]],
+ otherLowerBound: Long,
+ otherUpperBound: Long,
isLeft: Boolean): Unit = {
cRowWrapper.out = out
- val value = valueC.row
+ val row = valueC.row
val curProcessTime = ctx.timerService.currentProcessingTime
- val oppoLowerTime = curProcessTime + oppoLowerBound
- val oppoUpperTime = curProcessTime + oppoUpperBound
-
- // only when windowsize != 0, we need to store the element
- if (winSize != 0) {
- // register a timer to expire the element
- if (timerState.value == 0) {
- ctx.timerService.registerProcessingTimeTimer(curProcessTime + winSize + 1)
- timerState.update(curProcessTime + winSize + 1)
- }
+ val otherLowerTime = curProcessTime + otherLowerBound
+ val otherUpperTime = curProcessTime + otherUpperBound
+ if (winSize >= 0) {
+ // put row into state for later joining.
+ // (winSize == 0) joins rows received in the same millisecond.
var rowList = rowMapState.get(curProcessTime)
if (rowList == null) {
rowList = new util.ArrayList[Row]()
}
- rowList.add(value)
+ rowList.add(row)
rowMapState.put(curProcessTime, rowList)
+ // register a timer to remove the row from state once it is expired
+ if (timerState.value == 0) {
+ val cleanupTime = curProcessTime + winSize + 1
+ ctx.timerService.registerProcessingTimeTimer(cleanupTime)
+ timerState.update(cleanupTime)
+ }
}
- // loop the other stream elements
- val oppositeKeyIter = oppoRowMapState.keys().iterator()
- while (oppositeKeyIter.hasNext) {
- val eleTime = oppositeKeyIter.next()
- if (eleTime < oppoLowerTime) {
- listToRemove.add(eleTime)
- } else if (eleTime <= oppoUpperTime) {
- val oppoRowList = oppoRowMapState.get(eleTime)
- var i = 0
- if (isLeft) {
- while (i < oppoRowList.size) {
- joinFunction.join(value, oppoRowList.get(i), cRowWrapper)
+ // join row with rows received from the other input
+ val otherTimeIter = otherRowMapState.keys().iterator()
+ if (isLeft) {
+ // go over all timestamps in the other input's state
+ while (otherTimeIter.hasNext) {
+ val otherTimestamp = otherTimeIter.next()
+ if (otherTimestamp < otherLowerTime) {
+ // other timestamp is expired. Remove it later.
+ removeList.add(otherTimestamp)
+ } else if (otherTimestamp <= otherUpperTime) {
+ // join row with all rows from the other input for this timestamp
+ val otherRows = otherRowMapState.get(otherTimestamp)
+ var i = 0
+ while (i < otherRows.size) {
+ joinFunction.join(row, otherRows.get(i), cRowWrapper)
i += 1
}
- } else {
- while (i < oppoRowList.size) {
- joinFunction.join(oppoRowList.get(i), value, cRowWrapper)
+ }
+ }
+ } else {
+ // go over all timestamps in the other input's state
+ while (otherTimeIter.hasNext) {
+ val otherTimestamp = otherTimeIter.next()
+ if (otherTimestamp < otherLowerTime) {
+ // other timestamp is expired. Remove it later.
+ removeList.add(otherTimestamp)
+ } else if (otherTimestamp <= otherUpperTime) {
+ // join row with all rows from the other input for this timestamp
+ val otherRows = otherRowMapState.get(otherTimestamp)
+ var i = 0
+ while (i < otherRows.size) {
+ joinFunction.join(otherRows.get(i), row, cRowWrapper)
i += 1
}
}
}
}
- // expire records out-of-time
- var i = listToRemove.size - 1
+ // remove rows for expired timestamps
+ var i = removeList.size - 1
while (i >= 0) {
- oppoRowMapState.remove(listToRemove.get(i))
+ otherRowMapState.remove(removeList.get(i))
i -= 1
}
- listToRemove.clear()
+ removeList.clear()
}
/**
@@ -292,32 +310,34 @@ class ProcTimeWindowInnerJoin(
val expiredTime = curTime - winSize
val keyIter = rowMapState.keys().iterator()
- var nextTimer: Long = 0
+ var validTimestamp: Boolean = false
// Search for expired timestamps.
// If we find a non-expired timestamp, remember the timestamp and leave the loop.
// This way we find all expired timestamps if they are sorted without doing a full pass.
- while (keyIter.hasNext && nextTimer == 0) {
+ while (keyIter.hasNext && !validTimestamp) {
val recordTime = keyIter.next
if (recordTime < expiredTime) {
- listToRemove.add(recordTime)
+ removeList.add(recordTime)
} else {
- nextTimer = recordTime
+ // we found a timestamp that is still valid
+ validTimestamp = true
}
}
// Remove expired records from state
- var i = listToRemove.size - 1
+ var i = removeList.size - 1
while (i >= 0) {
- rowMapState.remove(listToRemove.get(i))
+ rowMapState.remove(removeList.get(i))
i -= 1
}
- listToRemove.clear()
+ removeList.clear()
// If the state has non-expired timestamps, register a new timer.
// Otherwise clean the complete state for this input.
- if (nextTimer != 0) {
- ctx.timerService.registerProcessingTimeTimer(nextTimer + winSize + 1)
- timerState.update(nextTimer + winSize + 1)
+ if (validTimestamp) {
+ val cleanupTime = curTime + winSize + 1
+ ctx.timerService.registerProcessingTimeTimer(cleanupTime)
+ timerState.update(cleanupTime)
} else {
timerState.clear()
rowMapState.clear()
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index fabeeba..c38d915 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -17,7 +17,6 @@
*/
package org.apache.flink.table.runtime.join
-import java.math.{BigDecimal => JBigDecimal}
import java.util
import org.apache.calcite.plan.RelOptUtil
@@ -27,263 +26,362 @@ import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlKind
import org.apache.flink.api.common.functions.FlatJoinFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.plan.schema.{RowSchema, TimeIndicatorRelDataType}
import org.apache.flink.types.Row
import scala.collection.JavaConverters._
/**
- * An util class to help analyze and build join code .
+ * An util class to optimize and generate window joins.
*/
object WindowJoinUtil {
+ case class WindowBounds(isEventTime: Boolean, leftLowerBound: Long, leftUpperBound: Long)
+
+ protected case class WindowBound(bound: Long, isLeftLower: Boolean)
+ protected case class TimePredicate(
+ isEventTime: Boolean,
+ leftInputOnLeftSide: Boolean,
+ pred: RexCall)
+ protected case class TimeAttributeAccess(isEventTime: Boolean, isLeftInput: Boolean)
+
/**
- * Analyze time-condtion to get time boundary for each stream and get the time type
- * and return remain condition.
+ * Extracts the window bounds from a join predicate.
+ *
+ * @param predicate join predicate
+ * @param leftLogicalFieldCnt number of attributes on the left join input
+ * @param inputSchema schema of the join result
+ * @param rexBuilder RexBuilder
+ * @param config TableConfig
*
- * @param condition join condition
- * @param leftLogicalFieldCnt left stream logical field num
- * @param inputSchema join rowtype schema
- * @param rexBuilder util to build rexNode
- * @param config table environment config
- * @return isRowTime, left lower boundary, right lower boundary, remain condition
+ * @return A Tuple2 of extracted window bounds and remaining predicates.
*/
- private[flink] def analyzeTimeBoundary(
- condition: RexNode,
+ private[flink] def extractWindowBoundsFromPredicate(
+ predicate: RexNode,
leftLogicalFieldCnt: Int,
- inputSchema: RowSchema,
+ inputSchema: RelDataType,
rexBuilder: RexBuilder,
- config: TableConfig): (Boolean, Long, Long, Option[RexNode]) = {
+ config: TableConfig): (Option[WindowBounds], Option[RexNode]) = {
// Converts the condition to conjunctive normal form (CNF)
- val cnfCondition = RexUtil.toCnf(rexBuilder, condition)
+ val cnfCondition = RexUtil.toCnf(rexBuilder, predicate)
// split the condition into time indicator condition and other condition
- val (timeTerms, remainTerms) = cnfCondition match {
- case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
- c.getOperands.asScala
- .map(analyzeCondtionTermType(_, leftLogicalFieldCnt, inputSchema.logicalType))
- .reduceLeft((l, r) => {
- (l._1 ++ r._1, l._2 ++ r._2)
- })
- case _ =>
- throw new TableException("A time-based stream join requires exactly " +
- "two join predicates that bound the time in both directions.")
+ val (timePreds, otherPreds) = cnfCondition match {
+ // We need at least two comparison predicates for a properly bounded window join.
+ // So we need an AND expression for a valid window join.
+ case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+ c.getOperands.asScala
+ .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema))
+ .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => {
+ analyzed match {
+ case Left(timePred) => (preds._1 :+ timePred, preds._2)
+ case Right(otherPred) => (preds._1, preds._2 :+ otherPred)
+ }
+ })
+ case _ =>
+ // No valid window bounds. A windowed stream join requires two comparison predicates that
+ // bound the time in both directions.
+ return (None, Some(predicate))
}
- if (timeTerms.size != 2) {
- throw new TableException("A time-based stream join requires exactly " +
- "two join predicates that bound the time in both directions.")
+ if (timePreds.size != 2) {
+ // No valid window bounds. A windowed stream join requires two comparison predicates that
+ // bound the time in both directions.
+ return (None, Some(predicate))
}
- // extract time offset from the time indicator conditon
- val streamTimeOffsets =
- timeTerms.map(x => extractTimeOffsetFromCondition(x._3, x._2, rexBuilder, config))
-
+ // assemble window bounds from predicates
+ val streamTimeOffsets = timePreds.map(computeWindowBoundFromPredicate(_, rexBuilder, config))
val (leftLowerBound, leftUpperBound) =
streamTimeOffsets match {
- case Seq((x, true), (y, false)) => (x, y)
- case Seq((x, false), (y, true)) => (y, x)
+ case Seq(Some(x: WindowBound), Some(y: WindowBound)) if x.isLeftLower && !y.isLeftLower =>
+ (x.bound, y.bound)
+ case Seq(Some(x: WindowBound), Some(y: WindowBound)) if !x.isLeftLower && y.isLeftLower =>
+ (y.bound, x.bound)
case _ =>
- throw new TableException(
- "Time-based join conditions must reference the time attribute of both input tables.")
+ // Window join requires two comparison predicate that bound the time in both directions.
+ return (None, Some(predicate))
}
// compose the remain condition list into one condition
val remainCondition =
- remainTerms match {
- case Seq() => None
+ otherPreds match {
+ case Seq() =>
+ None
case _ =>
- // Converts logical field references to physical ones.
- Some(remainTerms.map(inputSchema.mapRexNode).reduceLeft((l, r) => {
- RelOptUtil.andJoinFilters(rexBuilder, l, r)
- }))
+ Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r)))
}
- val isRowTime: Boolean = timeTerms(0)._1 match {
- case x if FlinkTypeFactory.isProctimeIndicatorType(x) => false
- case _ => true
- }
- (isRowTime, leftLowerBound, leftUpperBound, remainCondition)
+ val bounds = Some(WindowBounds(timePreds.head.isEventTime, leftLowerBound, leftUpperBound))
+
+ (bounds, remainCondition)
}
/**
- * Split the join conditions into time condition and non-time condition
+ * Analyzes a predicate and identifies whether it is a valid predicate for a window join.
+ * A valid window join predicate is a comparison predicate (<, <=, =>, >) that accesses
+ * time attributes of both inputs, each input on a different side of the condition.
+ * Both accessed time attributes must be of the same time type, i.e., row-time or proc-time.
+ *
+ * Examples:
+ * - left.rowtime > right.rowtime + 2.minutes => valid
+ * - left.proctime < right.rowtime + 2.minutes => invalid: different time type
+ * - left.rowtime == right.rowtime + 2.minutes => invalid: not a comparison predicate
+ * - left.rowtime - right.rowtime < 2.minutes => invalid: both time attributes on same side
*
- * @return (Seq(timeTerms), Seq(remainTerms)),
+ * If the predicate is a regular join predicate, i.e., it accesses no time attribute it is
+ * returned as well.
+ *
+ * @return Either a valid time predicate (Left) or a valid non-time predicate (Right)
*/
- private def analyzeCondtionTermType(
- conditionTerm: RexNode,
+ private def identifyTimePredicate(
+ pred: RexNode,
leftFieldCount: Int,
- inputType: RelDataType): (Seq[(RelDataType, Boolean, RexNode)], Seq[RexNode]) = {
-
- conditionTerm match {
- case c: RexCall if Seq(SqlKind.GREATER_THAN, SqlKind.GREATER_THAN_OR_EQUAL,
- SqlKind.LESS_THAN, SqlKind.LESS_THAN_OR_EQUAL).contains(c.getKind) =>
- val timeIndicators = extractTimeIndicatorAccesses(c, leftFieldCount, inputType)
- timeIndicators match {
- case Seq() =>
- (Seq(), Seq(c))
- case Seq(v1, v2) =>
- if (v1._1 != v2._1) {
- throw new TableException(
- "Both time attributes in a join condition must be of the same type.")
+ inputType: RelDataType): Either[TimePredicate, RexNode] = {
+
+ pred match {
+ case c: RexCall =>
+ c.getKind match {
+ case SqlKind.GREATER_THAN |
+ SqlKind.GREATER_THAN_OR_EQUAL |
+ SqlKind.LESS_THAN |
+ SqlKind.LESS_THAN_OR_EQUAL =>
+
+ val leftTerm = c.getOperands.get(0)
+ val rightTerm = c.getOperands.get(1)
+
+ // validate that both sides of the condition do not access non-time attributes
+ if (accessesNonTimeAttribute(leftTerm, inputType) ||
+ accessesNonTimeAttribute(rightTerm, inputType)) {
+ return Right(pred)
}
- if (v1._2 == v2._2) {
- throw new TableException("Time-based join conditions " +
- "must reference the time attribute of both input tables.")
+
+ // get time attribute on left side of comparison
+ val leftTimeAttrAccess =
+ extractTimeAttributeAccesses(leftTerm, leftFieldCount, inputType) match {
+ case Seq() => None
+ case Seq(a) => Some(a)
+ case _ =>
+ // Window join predicate may only access a single time attribute on each side.
+ return Right(pred)
+ }
+
+ // get time attribute on right side of comparison
+ val rightTimeAccess =
+ extractTimeAttributeAccesses(rightTerm, leftFieldCount, inputType) match {
+ case Seq() => None
+ case Seq(a) => Some(a)
+ case _ =>
+ // Window join predicate may only access a single time attribute on each side.
+ return Right(pred)
+ }
+
+ // check if both sides of the condition access a time attribute,
+ // if both join inputs are accessed, and
+ // if both accesses are on the same time type (event-time or proc-time)
+ (leftTimeAttrAccess, rightTimeAccess) match {
+ case (None, None) =>
+ // neither left or right accesses a time attribute. This is a regular join predicate
+ Right(c)
+ case (Some(_), None) | (None, Some(_)) =>
+ // Both sides or a window join predicate must access a time attribute.
+ Right(pred)
+ case (Some(left), Some(right)) if left.isEventTime != right.isEventTime =>
+ // Both time attributes in a window join predicate must be of the same type.
+ Right(pred)
+ case (Some(left), Some(right)) if left.isLeftInput == right.isLeftInput =>
+ // Window join predicates must reference the time attribute of both inputs.
+ Right(pred)
+ case (Some(left), Some(_)) =>
+ Left(TimePredicate(left.isEventTime, left.isLeftInput, c))
}
- (Seq((v1._1, v1._2, c)), Seq())
- case _ =>
- throw new TableException(
- "Time-based join conditions must reference the time attribute of both input tables.")
+ // not a comparison predicate.
+ case _ => Right(pred)
}
case other =>
- val timeIndicators = extractTimeIndicatorAccesses(other, leftFieldCount, inputType)
- timeIndicators match {
- case Seq() =>
- (Seq(), Seq(other))
- case _ =>
- throw new TableException("Time indicators can not be used in non time-condition.")
- }
+ Right(other)
}
}
/**
- * Extracts all time indicator attributes that are accessed in an expression.
+ * Extracts all time attributes that are accessed in an expression.
*
- * @return seq(timeType, is left input time indicator)
+ * @return A Seq of all time attribute accessed in the expression.
*/
- def extractTimeIndicatorAccesses(
- expression: RexNode,
+ def extractTimeAttributeAccesses(
+ expr: RexNode,
leftFieldCount: Int,
- inputType: RelDataType): Seq[(RelDataType, Boolean)] = {
+ inputType: RelDataType): Seq[TimeAttributeAccess] = {
- expression match {
+ expr match {
case i: RexInputRef =>
+ // check if time attribute is accessed and from which input side
val idx = i.getIndex
inputType.getFieldList.get(idx).getType match {
- case t: TimeIndicatorRelDataType if idx < leftFieldCount =>
- // left table time indicator
- Seq((t, true))
case t: TimeIndicatorRelDataType =>
- // right table time indicator
- Seq((t, false))
- case _ => Seq()
+ // time attribute access. Remember time type and side of input
+ val isLeftInput = idx < leftFieldCount
+ Seq(TimeAttributeAccess(t.isEventTime, isLeftInput))
+ case _ =>
+ // not a time attribute access.
+ Seq()
}
case c: RexCall =>
+ // concat time-attributes of all operands
c.operands.asScala
- .map(extractTimeIndicatorAccesses(_, leftFieldCount, inputType))
+ .map(extractTimeAttributeAccesses(_, leftFieldCount, inputType))
.reduce(_ ++ _)
case _ => Seq()
}
}
/**
+ * Checks if an expression accesses a time attribute.
+ *
+ * @param expr The expression to check.
+ * @param inputType The input type of the expression.
+ * @return True, if the expression accesses a time attribute. False otherwise.
+ */
+ def accessesTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
+ expr match {
+ case i: RexInputRef =>
+ val accessedType = inputType.getFieldList.get(i.getIndex).getType
+ accessedType match {
+ case _: TimeIndicatorRelDataType => true
+ case _ => false
+ }
+ case c: RexCall =>
+ c.operands.asScala.exists(accessesTimeAttribute(_, inputType))
+ case _ => false
+ }
+ }
+
+ /**
+ * Checks if an expression accesses a non-time attribute.
+ *
+ * @param expr The expression to check.
+ * @param inputType The input type of the expression.
+ * @return True, if the expression accesses a non-time attribute. False otherwise.
+ */
+ def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
+ expr match {
+ case i: RexInputRef =>
+ val accessedType = inputType.getFieldList.get(i.getIndex).getType
+ accessedType match {
+ case _: TimeIndicatorRelDataType => false
+ case _ => true
+ }
+ case c: RexCall =>
+ c.operands.asScala.exists(accessesNonTimeAttribute(_, inputType))
+ case _ => false
+ }
+ }
+
+ /**
* Computes the absolute bound on the left operand of a comparison expression and
* whether the bound is an upper or lower bound.
*
* @return window boundary, is left lower bound
*/
- def extractTimeOffsetFromCondition(
- timeTerm: RexNode,
- isLeftExprBelongLeftTable: Boolean,
+ def computeWindowBoundFromPredicate(
+ timePred: TimePredicate,
rexBuilder: RexBuilder,
- config: TableConfig): (Long, Boolean) = {
-
- val timeCall: RexCall = timeTerm.asInstanceOf[RexCall]
+ config: TableConfig): Option[WindowBound] = {
val isLeftLowerBound: Boolean =
- timeTerm.getKind match {
- // e.g a.proctime > b.proctime - 5 sec, then it's the lower bound of a and the value is -5
- // e.g b.proctime > a.proctime - 5 sec, then it's not the lower bound of a but upper bound
- case kind@(SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL) =>
- isLeftExprBelongLeftTable
- // e.g a.proctime < b.proctime + 5 sec, the the upper bound of a is 5
- case kind@(SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
- !isLeftExprBelongLeftTable
- case _ =>
- throw new TableException("Unsupported time-condition.")
+ timePred.pred.getKind match {
+ case (SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL) =>
+ timePred.leftInputOnLeftSide
+ case (SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
+ !timePred.leftInputOnLeftSide
}
- val (leftLiteral, rightLiteral) =
- reduceTimeExpression(
- timeCall.operands.get(0),
- timeCall.operands.get(1),
- rexBuilder,
- config)
- val tmpTimeOffset: Long =
- if (isLeftExprBelongLeftTable) rightLiteral - leftLiteral else leftLiteral - rightLiteral
-
- val boundary =
- tmpTimeOffset.signum * (
- if (timeTerm.getKind == SqlKind.LESS_THAN || timeTerm.getKind == SqlKind.GREATER_THAN) {
- tmpTimeOffset.abs - 1
- } else {
- tmpTimeOffset.abs
- })
-
- (boundary, isLeftLowerBound)
+ // reduce predicate to constants to compute bounds
+ val (leftLiteral, rightLiteral) = reduceTimeExpression(timePred, rexBuilder, config)
+
+ if (leftLiteral.isEmpty || rightLiteral.isEmpty) {
+ return None
+ }
+
+ // compute boundary
+ val tmpTimeOffset: Long = if (timePred.leftInputOnLeftSide) {
+ rightLiteral.get - leftLiteral.get
+ } else {
+ leftLiteral.get - rightLiteral.get
+ }
+ val boundary = timePred.pred.getKind match {
+ case SqlKind.LESS_THAN =>
+ tmpTimeOffset - 1
+ case SqlKind.GREATER_THAN =>
+ tmpTimeOffset + 1
+ case _ =>
+ tmpTimeOffset
+ }
+
+ Some(WindowBound(boundary, isLeftLowerBound))
}
/**
- * Calculates the time boundary by replacing the time attribute by a zero literal
- * and reducing the expression.
- * For example:
- * b.proctime - interval '1' second - interval '2' second will be translated to
- * 0 - 1000 - 2000
+ * Replaces the time attributes on both sides of a time predicate by a zero literal and
+ * reduces the expressions on both sides to a long literal.
+ *
+ * @param timePred The time predicate which both sides are reduced.
+ * @param rexBuilder A RexBuilder
+ * @param config A TableConfig.
+ * @return The values of the reduced literals on both sides of the time comparison predicate.
*/
private def reduceTimeExpression(
- leftRexNode: RexNode,
- rightRexNode: RexNode,
+ timePred: TimePredicate,
rexBuilder: RexBuilder,
- config: TableConfig): (Long, Long) = {
+ config: TableConfig): (Option[Long], Option[Long]) = {
/**
- * replace the rowtime/proctime with zero literal.
+ * Replace the time attribute by zero literal.
*/
def replaceTimeFieldWithLiteral(expr: RexNode): RexNode = {
expr match {
case c: RexCall =>
// replace in call operands
- val newOps = c.operands.asScala.map(replaceTimeFieldWithLiteral(_)).asJava
+ val newOps = c.operands.asScala.map(replaceTimeFieldWithLiteral).asJava
rexBuilder.makeCall(c.getType, c.getOperator, newOps)
case i: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(i.getType) =>
// replace with timestamp
rexBuilder.makeZeroLiteral(expr.getType)
- case _: RexInputRef =>
- throw new TableException("Time join condition may only reference time indicator fields.")
case _ => expr
}
}
- val literalLeftRex = replaceTimeFieldWithLiteral(leftRexNode)
- val literalRightRex = replaceTimeFieldWithLiteral(rightRexNode)
+ val leftSide = timePred.pred.operands.get(0)
+ val rightSide = timePred.pred.operands.get(1)
+ val leftSideWithLiteral = replaceTimeFieldWithLiteral(leftSide)
+ val rightSideWithLiteral = replaceTimeFieldWithLiteral(rightSide)
+
+ // reduce expression to literal
val exprReducer = new ExpressionReducer(config)
val originList = new util.ArrayList[RexNode]()
- originList.add(literalLeftRex)
- originList.add(literalRightRex)
+ originList.add(leftSideWithLiteral)
+ originList.add(rightSideWithLiteral)
val reduceList = new util.ArrayList[RexNode]()
exprReducer.reduce(rexBuilder, originList, reduceList)
- val literals = reduceList.asScala.map(f => f match {
+ // extract bounds from reduced literal
+ val literals = reduceList.asScala.map {
case literal: RexLiteral =>
- literal.getValue2.asInstanceOf[Long]
+ Some(literal.getValue2.asInstanceOf[Long])
case _ =>
- throw TableException(
- "Time condition may only consist of time attributes, literals, and arithmetic operators.")
- })
+ None
+ }
- (literals(0), literals(1))
+ (literals.head, literals(1))
}
-
/**
- * Generate other non-equi condition function
+ * Generates a JoinFunction that applies additional join predicates and projects the result.
*
* @param config table env config
* @param joinType join type to determain whether input can be null
@@ -300,7 +398,7 @@ object WindowJoinUtil {
rightType: TypeInformation[Row],
returnType: RowSchema,
otherCondition: Option[RexNode],
- ruleDescription: String) = {
+ ruleDescription: String): GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row] = {
// whether input can be null
val nullCheck = joinType match {
@@ -311,7 +409,7 @@ object WindowJoinUtil {
}
// generate other non-equi function code
- val generator = new CodeGenerator(
+ val generator = new FunctionCodeGenerator(
config,
nullCheck,
leftType,
@@ -329,7 +427,9 @@ object WindowJoinUtil {
|${generator.collectorTerm}.collect(${conversion.resultTerm});
|""".stripMargin
case Some(remainCondition) =>
- val genCond = generator.generateExpression(remainCondition)
+ // map logical field accesses to physical accesses
+ val physicalCondition = returnType.mapRexNode(remainCondition)
+ val genCond = generator.generateExpression(physicalCondition)
s"""
|${genCond.code}
|if (${genCond.resultTerm}) {
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
deleted file mode 100644
index 769ba55..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.updateutils
-
-import org.apache.calcite.rel.{RelNode, RelVisitor}
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SqlKind
-import org.apache.flink.table.plan.nodes.datastream._
-import _root_.scala.collection.JavaConverters._
-
-object UpdateCheckUtils {
-
- /** Validates that the plan produces only append changes. */
- def isAppendOnly(plan: RelNode): Boolean = {
- val appendOnlyValidator = new AppendOnlyValidator
- appendOnlyValidator.go(plan)
-
- appendOnlyValidator.isAppendOnly
- }
-
- /** Extracts the unique keys of the table produced by the plan. */
- def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
- val keyExtractor = new UniqueKeyExtractor
- keyExtractor.go(plan)
- keyExtractor.keys
- }
-
- private class AppendOnlyValidator extends RelVisitor {
-
- var isAppendOnly = true
-
- override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
- node match {
- case s: DataStreamRel if s.producesUpdates =>
- isAppendOnly = false
- case _ =>
- super.visit(node, ordinal, parent)
- }
- }
- }
-
- /** Identifies unique key fields in the output of a RelNode. */
- private class UniqueKeyExtractor extends RelVisitor {
-
- var keys: Option[Array[String]] = None
-
- override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
- node match {
- case c: DataStreamCalc =>
- super.visit(node, ordinal, parent)
- // check if input has keys
- if (keys.isDefined) {
- // track keys forward
- val inNames = c.getInput.getRowType.getFieldNames
- val inOutNames = c.getProgram.getNamedProjects.asScala
- .map(p => {
- c.getProgram.expandLocalRef(p.left) match {
- // output field is forwarded input field
- case i: RexInputRef => (i.getIndex, p.right)
- // output field is renamed input field
- case a: RexCall if a.getKind.equals(SqlKind.AS) =>
- a.getOperands.get(0) match {
- case ref: RexInputRef =>
- (ref.getIndex, p.right)
- case _ =>
- (-1, p.right)
- }
- // output field is not forwarded from input
- case _: RexNode => (-1, p.right)
- }
- })
- // filter all non-forwarded fields
- .filter(_._1 >= 0)
- // resolve names of input fields
- .map(io => (inNames.get(io._1), io._2))
-
- // filter by input keys
- val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
- // check if all keys have been preserved
- if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
- // all key have been preserved (but possibly renamed)
- keys = Some(outKeys.toArray)
- } else {
- // some (or all) keys have been removed. Keys are no longer unique and removed
- keys = None
- }
- }
- case _: DataStreamOverAggregate =>
- super.visit(node, ordinal, parent)
- // keys are always forwarded by Over aggregate
- case a: DataStreamGroupAggregate =>
- // get grouping keys
- val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
- keys = Some(groupKeys.toArray)
- case w: DataStreamGroupWindowAggregate =>
- // get grouping keys
- val groupKeys =
- w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
- // get window start and end time
- val windowStartEnd = w.getWindowProperties.map(_.name)
- // we have only a unique key if at least one window property is selected
- if (windowStartEnd.nonEmpty) {
- keys = Some(groupKeys ++ windowStartEnd)
- }
- case _: DataStreamRel =>
- // anything else does not forward keys or might duplicate key, so we can stop
- keys = None
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
deleted file mode 100644
index d4ff3f7..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.types.Row
-import org.junit._
-
-import scala.collection.mutable
-
-class JoinITCase extends StreamingWithStateTestBase {
-
- val data = List(
- (1L, 1, "Hello"),
- (2L, 2, "Hello"),
- (3L, 3, "Hello"),
- (4L, 4, "Hello"),
- (5L, 5, "Hello"),
- (6L, 6, "Hello"),
- (7L, 7, "Hello World"),
- (8L, 8, "Hello World"),
- (20L, 20, "Hello World"))
-
- /** test process time inner join **/
- @Test
- def testProcessTimeInnerJoin(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStateBackend(getStateBackend)
- StreamITCase.testResults = mutable.MutableList()
- env.setParallelism(1)
-
- val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
- "t1.proctime between t2.proctime - interval '5' second and t2.proctime + interval '5' second"
-
- val data1 = new mutable.MutableList[(Int, Long, String)]
- data1.+=((1, 1L, "Hi1"))
- data1.+=((1, 2L, "Hi2"))
- data1.+=((1, 5L, "Hi3"))
- data1.+=((2, 7L, "Hi5"))
- data1.+=((1, 9L, "Hi6"))
- data1.+=((1, 8L, "Hi8"))
-
- val data2 = new mutable.MutableList[(Int, Long, String)]
- data2.+=((1, 1L, "HiHi"))
- data2.+=((2, 2L, "HeHe"))
-
- val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
- val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
- tEnv.registerTable("T2", t2)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
- }
-
- /** test process time inner join with other condition **/
- @Test
- def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStateBackend(getStateBackend)
- StreamITCase.testResults = mutable.MutableList()
- env.setParallelism(1)
-
- val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
- "t1.proctime between t2.proctime - interval '5' second " +
- "and t2.proctime + interval '5' second " +
- "and t1.b > t2.b and t1.b + t2.b < 14"
-
- val data1 = new mutable.MutableList[(String, Long, String)]
- data1.+=(("1", 1L, "Hi1"))
- data1.+=(("1", 2L, "Hi2"))
- data1.+=(("1", 5L, "Hi3"))
- data1.+=(("2", 7L, "Hi5"))
- data1.+=(("1", 9L, "Hi6"))
- data1.+=(("1", 8L, "Hi8"))
-
- val data2 = new mutable.MutableList[(String, Long, String)]
- data2.+=(("1", 5L, "HiHi"))
- data2.+=(("2", 2L, "HeHe"))
-
- val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
- val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
- tEnv.registerTable("T2", t2)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
- }
-
-}
-
[2/3] flink git commit: [FLINK-6232] [table] Add SQL documentation
for time window join.
Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
deleted file mode 100644
index 15e8b89..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.calcite.rel.logical.LogicalJoin
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.table.runtime.join.WindowJoinUtil
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.Assert._
-import org.junit.Test
-
-class JoinTest extends TableTestBase {
- private val streamUtil: StreamTableTestUtil = streamTestUtil()
- streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
- streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
-
- @Test
- def testProcessingTimeInnerJoin() = {
-
- val sqlQuery = "SELECT t1.a, t2.b " +
- "FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " +
- "t1.proctime between t2.proctime - interval '1' hour and t2.proctime + interval '1' hour"
- val expected =
- unaryNode(
- "DataStreamCalc",
- binaryNode(
- "DataStreamWindowJoin",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "proctime")
- ),
- unaryNode(
- "DataStreamCalc",
- streamTableNode(1),
- term("select", "a", "b", "proctime")
- ),
- term("where",
- "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
- "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
- term("join", "a, proctime, a0, b, proctime0"),
- term("joinType", "InnerJoin")
- ),
- term("select", "a", "b")
- )
-
- streamUtil.verifySql(sqlQuery, expected)
- }
-
- /** There should exist time conditions **/
- @Test(expected = classOf[TableException])
- def testWindowJoinUnExistTimeCondition() = {
- val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a"
- streamUtil.verifySql(sql, "n/a")
- }
-
- /** There should exist exactly two time conditions **/
- @Test(expected = classOf[TableException])
- def testWindowJoinSingleTimeCondition() = {
- val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
- " and t1.proctime > t2.proctime - interval '5' second"
- streamUtil.verifySql(sql, "n/a")
- }
-
- /** Both time attributes in a join condition must be of the same type **/
- @Test(expected = classOf[TableException])
- def testWindowJoinDiffTimeIndicator() = {
- val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
- " and t1.proctime > t2.proctime - interval '5' second " +
- " and t1.proctime < t2.c + interval '5' second"
- streamUtil.verifySql(sql, "n/a")
- }
-
- /** The time conditions should be an And condition **/
- @Test(expected = classOf[TableException])
- def testWindowJoinNotCnfCondition() = {
- val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" +
- " and (t1.proctime > t2.proctime - interval '5' second " +
- " or t1.proctime < t2.c + interval '5' second)"
- streamUtil.verifySql(sql, "n/a")
- }
-
- @Test
- def testJoinTimeBoundary(): Unit = {
- verifyTimeBoundary(
- "t1.proctime between t2.proctime - interval '1' hour " +
- "and t2.proctime + interval '1' hour",
- -3600000,
- 3600000,
- "proctime")
-
- verifyTimeBoundary(
- "t1.proctime > t2.proctime - interval '1' second and " +
- "t1.proctime < t2.proctime + interval '1' second",
- -999,
- 999,
- "proctime")
-
- verifyTimeBoundary(
- "t1.c >= t2.c - interval '1' second and " +
- "t1.c <= t2.c + interval '1' second",
- -1000,
- 1000,
- "rowtime")
-
- verifyTimeBoundary(
- "t1.c >= t2.c and " +
- "t1.c <= t2.c + interval '1' second",
- 0,
- 1000,
- "rowtime")
-
- verifyTimeBoundary(
- "t1.c >= t2.c + interval '1' second and " +
- "t1.c <= t2.c + interval '10' second",
- 1000,
- 10000,
- "rowtime")
-
- verifyTimeBoundary(
- "t2.c - interval '1' second <= t1.c and " +
- "t2.c + interval '10' second >= t1.c",
- -1000,
- 10000,
- "rowtime")
-
- verifyTimeBoundary(
- "t1.c - interval '2' second >= t2.c + interval '1' second -" +
- "interval '10' second and " +
- "t1.c <= t2.c + interval '10' second",
- -7000,
- 10000,
- "rowtime")
-
- verifyTimeBoundary(
- "t1.c >= t2.c - interval '10' second and " +
- "t1.c <= t2.c - interval '5' second",
- -10000,
- -5000,
- "rowtime")
- }
-
- @Test
- def testJoinRemainConditionConvert(): Unit = {
- streamUtil.addTable[(Int, Long, Int)]("MyTable3", 'a, 'b.rowtime, 'c, 'proctime.proctime)
- streamUtil.addTable[(Int, Long, Int)]("MyTable4", 'a, 'b.rowtime, 'c, 'proctime.proctime)
- val query =
- "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
- "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second and " +
- "t1.c > t2.c"
- verifyRemainConditionConvert(
- query,
- ">($1, $3)")
-
- val query1 =
- "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
- "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second "
- verifyRemainConditionConvert(
- query1,
- "")
-
- streamUtil.addTable[(Int, Long, Int)]("MyTable5", 'a, 'b, 'c, 'proctime.proctime)
- streamUtil.addTable[(Int, Long, Int)]("MyTable6", 'a, 'b, 'c, 'proctime.proctime)
- val query2 =
- "SELECT t1.a, t2.c FROM MyTable5 as t1 join MyTable6 as t2 on t1.a = t2.a and " +
- "t1.proctime >= t2.proctime - interval '10' second " +
- "and t1.proctime <= t2.proctime - interval '5' second and " +
- "t1.c > t2.c"
- verifyRemainConditionConvert(
- query2,
- ">($2, $5)")
- }
-
- def verifyTimeBoundary(
- timeSql: String,
- expLeftSize: Long,
- expRightSize: Long,
- expTimeType: String) = {
- val query =
- "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
-
- val resultTable = streamUtil.tEnv.sql(query)
- val relNode = resultTable.getRelNode
- val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
- val rexNode = joinNode.getCondition
- val (isRowTime, lowerBound, upperBound, conditionWithoutTime) =
- WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType),
- joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
-
- val timeTypeStr =
- if (isRowTime) "rowtime"
- else "proctime"
- assertEquals(expLeftSize, lowerBound)
- assertEquals(expRightSize, upperBound)
- assertEquals(expTimeType, timeTypeStr)
- }
-
- def verifyRemainConditionConvert(
- query: String,
- expectCondStr: String) = {
-
- val resultTable = streamUtil.tEnv.sql(query)
- val relNode = resultTable.getRelNode
- val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
- val joinInfo = joinNode.analyzeCondition
- val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
- val (isRowTime, lowerBound, upperBound, remainCondition) =
- WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType),
- joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
-
- val actual: String = remainCondition.getOrElse("").toString
-
- assertEquals(expectCondStr, actual)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
new file mode 100644
index 0000000..640fd26
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.join.WindowJoinUtil
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Assert._
+import org.junit.Test
+
+class JoinTest extends TableTestBase {
+ private val streamUtil: StreamTableTestUtil = streamTestUtil()
+ streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+ streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+
+ @Test
+ def testProcessingTimeInnerJoinWithOnClause() = {
+
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1 JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+ |""".stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "proctime")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "proctime")
+ ),
+ term("where",
+ "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+ "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+ term("join", "a, proctime, a0, b, proctime0"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testProcessingTimeInnerJoinWithWhereClause() = {
+
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1, MyTable2 t2
+ |WHERE t1.a = t2.a AND
+ | t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+ |""".stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "proctime")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "proctime")
+ ),
+ term("where",
+ "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+ "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+ term("join", "a, proctime, a0, b, proctime0"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b0 AS b")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testJoinTimeBoundary(): Unit = {
+ verifyTimeBoundary(
+ "t1.proctime between t2.proctime - interval '1' hour " +
+ "and t2.proctime + interval '1' hour",
+ -3600000,
+ 3600000,
+ "proctime")
+
+ verifyTimeBoundary(
+ "t1.proctime > t2.proctime - interval '1' second and " +
+ "t1.proctime < t2.proctime + interval '1' second",
+ -999,
+ 999,
+ "proctime")
+
+ verifyTimeBoundary(
+ "t1.c >= t2.c - interval '1' second and " +
+ "t1.c <= t2.c + interval '1' second",
+ -1000,
+ 1000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c >= t2.c and " +
+ "t1.c <= t2.c + interval '1' second",
+ 0,
+ 1000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c >= t2.c + interval '1' second and " +
+ "t1.c <= t2.c + interval '10' second",
+ 1000,
+ 10000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t2.c - interval '1' second <= t1.c and " +
+ "t2.c + interval '10' second >= t1.c",
+ -1000,
+ 10000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c - interval '2' second >= t2.c + interval '1' second -" +
+ "interval '10' second and " +
+ "t1.c <= t2.c + interval '10' second",
+ -7000,
+ 10000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c >= t2.c - interval '10' second and " +
+ "t1.c <= t2.c - interval '5' second",
+ -10000,
+ -5000,
+ "rowtime")
+ }
+
+ @Test
+ def testJoinRemainConditionConvert(): Unit = {
+ streamUtil.addTable[(Int, Long, Int)]("MyTable3", 'a, 'b.rowtime, 'c, 'proctime.proctime)
+ streamUtil.addTable[(Int, Long, Int)]("MyTable4", 'a, 'b.rowtime, 'c, 'proctime.proctime)
+ val query =
+ "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
+ "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second and " +
+ "t1.c > t2.c"
+ verifyRemainConditionConvert(
+ query,
+ ">($2, $6)")
+
+ val query1 =
+ "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " +
+ "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second "
+ verifyRemainConditionConvert(
+ query1,
+ "")
+
+ streamUtil.addTable[(Int, Long, Int)]("MyTable5", 'a, 'b, 'c, 'proctime.proctime)
+ streamUtil.addTable[(Int, Long, Int)]("MyTable6", 'a, 'b, 'c, 'proctime.proctime)
+ val query2 =
+ "SELECT t1.a, t2.c FROM MyTable5 as t1 join MyTable6 as t2 on t1.a = t2.a and " +
+ "t1.proctime >= t2.proctime - interval '10' second " +
+ "and t1.proctime <= t2.proctime - interval '5' second and " +
+ "t1.c > t2.c"
+ verifyRemainConditionConvert(
+ query2,
+ ">($2, $6)")
+ }
+
+ private def verifyTimeBoundary(
+ timeSql: String,
+ expLeftSize: Long,
+ expRightSize: Long,
+ expTimeType: String): Unit = {
+ val query =
+ "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
+
+ val resultTable = streamUtil.tableEnv.sql(query)
+ val relNode = resultTable.getRelNode
+ val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+ val rexNode = joinNode.getCondition
+ val (windowBounds, _) =
+ WindowJoinUtil.extractWindowBoundsFromPredicate(
+ rexNode,
+ 4,
+ joinNode.getRowType,
+ joinNode.getCluster.getRexBuilder,
+ streamUtil.tableEnv.getConfig)
+
+ val timeTypeStr =
+ if (windowBounds.get.isEventTime) "rowtime"
+ else "proctime"
+ assertEquals(expLeftSize, windowBounds.get.leftLowerBound)
+ assertEquals(expRightSize, windowBounds.get.leftUpperBound)
+ assertEquals(expTimeType, timeTypeStr)
+ }
+
+ private def verifyRemainConditionConvert(
+ query: String,
+ expectCondStr: String): Unit = {
+
+ val resultTable = streamUtil.tableEnv.sql(query)
+ val relNode = resultTable.getRelNode
+ val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+ val joinInfo = joinNode.analyzeCondition
+ val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
+ val (_, remainCondition) =
+ WindowJoinUtil.extractWindowBoundsFromPredicate(
+ rexNode,
+ 4,
+ joinNode.getRowType,
+ joinNode.getCluster.getRexBuilder,
+ streamUtil.tableEnv.getConfig)
+
+ val actual: String = remainCondition.getOrElse("").toString
+
+ assertEquals(expectCondStr, actual)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
new file mode 100644
index 0000000..9cce37e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class JoinValidationTest extends TableTestBase {
+
+ private val streamUtil: StreamTableTestUtil = streamTestUtil()
+ streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+ streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+
+ /** There should exist time conditions **/
+ @Test(expected = classOf[TableException])
+ def testWindowJoinUnExistTimeCondition() = {
+ val sql =
+ """
+ |SELECT t2.a
+ |FROM MyTable t1 JOIN MyTable2 t2 ON t1.a = t2.a""".stripMargin
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ /** There should exist exactly two time conditions **/
+ @Test(expected = classOf[TableException])
+ def testWindowJoinSingleTimeCondition() = {
+ val sql =
+ """
+ |SELECT t2.a
+ |FROM MyTable t1 JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | t1.proctime > t2.proctime - INTERVAL '5' SECOND""".stripMargin
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ /** Both time attributes in a join condition must be of the same type **/
+ @Test(expected = classOf[TableException])
+ def testWindowJoinDiffTimeIndicator() = {
+ val sql =
+ """
+ |SELECT t2.a FROM
+ |MyTable t1 JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | t1.proctime > t2.proctime - INTERVAL '5' SECOND AND
+ | t1.proctime < t2.c + INTERVAL '5' SECOND""".stripMargin
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ /** The time conditions should be an And condition **/
+ @Test(expected = classOf[TableException])
+ def testWindowJoinNotCnfCondition() = {
+ val sql =
+ """
+ |SELECT t2.a
+ |FROM MyTable t1 JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | (t1.proctime > t2.proctime - INTERVAL '5' SECOND OR
+ | t1.proctime < t2.c + INTERVAL '5' SECOND)""".stripMargin
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ /** Validates that no rowtime attribute is in the output schema **/
+ @Test(expected = classOf[TableException])
+ def testNoRowtimeAttributeInResult(): Unit = {
+ val sql =
+ """
+ |SELECT *
+ |FROM MyTable t1, MyTable2 t2
+ |WHERE t1.a = t2.a AND
+ | t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND t2.proctime
+ | """.stripMargin
+
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 7885160..90c8ea4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -85,7 +85,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
"DataStreamCalc",
streamTableNode(0),
term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"),
- term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)")
+ term("where", ">(rowtime, 1990-12-02 12:11:11)")
)
util.verifyTable(result, expected)
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index c008ed3..6c24c5d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -20,20 +20,18 @@ package org.apache.flink.table.runtime.harness
import java.util.concurrent.ConcurrentLinkedQueue
import java.lang.{Integer => JInt}
-import org.apache.flink.api.common.functions.FlatJoinFunction
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
-import org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, TwoInputStreamOperatorTestHarness}
-import org.apache.flink.table.codegen.GeneratedFunction
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, TupleRowKeySelector}
import org.apache.flink.table.runtime.join.ProcTimeWindowInnerJoin
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.types.Row
import org.junit.Test
-
+import org.junit.Assert.{assertEquals, assertTrue}
class JoinHarnessTest extends HarnessTestBase{
@@ -89,7 +87,7 @@ class JoinHarnessTest extends HarnessTestBase{
new TupleRowKeySelector[Integer](0),
new TupleRowKeySelector[Integer](0),
BasicTypeInfo.INT_TYPE_INFO,
- 1,1,0)
+ 1, 1, 0)
testHarness.open()
@@ -97,16 +95,16 @@ class JoinHarnessTest extends HarnessTestBase{
testHarness.setProcessingTime(1)
testHarness.processElement1(new StreamRecord(
CRow(Row.of(1: JInt, "aaa"), true), 1))
- assert(testHarness.numProcessingTimeTimers() == 1)
+ assertEquals(1, testHarness.numProcessingTimeTimers())
testHarness.setProcessingTime(2)
testHarness.processElement1(new StreamRecord(
CRow(Row.of(2: JInt, "bbb"), true), 2))
- assert(testHarness.numProcessingTimeTimers() == 2)
+ assertEquals(2, testHarness.numProcessingTimeTimers())
testHarness.setProcessingTime(3)
testHarness.processElement1(new StreamRecord(
CRow(Row.of(1: JInt, "aaa2"), true), 3))
- assert(testHarness.numKeyedStateEntries() == 4)
- assert(testHarness.numProcessingTimeTimers() == 2)
+ assertEquals(4, testHarness.numKeyedStateEntries())
+ assertEquals(2, testHarness.numProcessingTimeTimers())
// right stream input and output normally
testHarness.processElement2(new StreamRecord(
@@ -114,20 +112,20 @@ class JoinHarnessTest extends HarnessTestBase{
testHarness.setProcessingTime(4)
testHarness.processElement2(new StreamRecord(
CRow(Row.of(2: JInt, "Hello1"), true), 4))
- assert(testHarness.numKeyedStateEntries() == 8)
- assert(testHarness.numProcessingTimeTimers() == 4)
+ assertEquals(8, testHarness.numKeyedStateEntries())
+ assertEquals(4, testHarness.numProcessingTimeTimers())
// expired left stream record at timestamp 1
testHarness.setProcessingTime(12)
- assert(testHarness.numKeyedStateEntries() == 8)
- assert(testHarness.numProcessingTimeTimers() == 4)
+ assertEquals(8, testHarness.numKeyedStateEntries())
+ assertEquals(4, testHarness.numProcessingTimeTimers())
testHarness.processElement2(new StreamRecord(
CRow(Row.of(1: JInt, "Hi2"), true), 12))
// expired right stream record at timestamp 4 and all left stream
testHarness.setProcessingTime(25)
- assert(testHarness.numKeyedStateEntries() == 2)
- assert(testHarness.numProcessingTimeTimers() == 1)
+ assertEquals(2, testHarness.numKeyedStateEntries())
+ assertEquals(1, testHarness.numProcessingTimeTimers())
testHarness.processElement1(new StreamRecord(
CRow(Row.of(1: JInt, "aaa3"), true), 25))
testHarness.processElement1(new StreamRecord(
@@ -136,9 +134,9 @@ class JoinHarnessTest extends HarnessTestBase{
CRow(Row.of(2: JInt, "Hello2"), true), 25))
testHarness.setProcessingTime(45)
- assert(testHarness.numKeyedStateEntries() > 0)
+ assertTrue(testHarness.numKeyedStateEntries() > 0)
testHarness.setProcessingTime(46)
- assert(testHarness.numKeyedStateEntries() == 0)
+ assertEquals(0, testHarness.numKeyedStateEntries())
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -175,7 +173,7 @@ class JoinHarnessTest extends HarnessTestBase{
new TupleRowKeySelector[Integer](0),
new TupleRowKeySelector[Integer](0),
BasicTypeInfo.INT_TYPE_INFO,
- 1,1,0)
+ 1, 1, 0)
testHarness.open()
@@ -188,37 +186,38 @@ class JoinHarnessTest extends HarnessTestBase{
testHarness.setProcessingTime(3)
testHarness.processElement1(new StreamRecord(
CRow(Row.of(1: JInt, "aaa3"), true), 3))
- assert(testHarness.numKeyedStateEntries() == 4)
- assert(testHarness.numProcessingTimeTimers() == 2)
+ assertEquals(4, testHarness.numKeyedStateEntries())
+ assertEquals(2, testHarness.numProcessingTimeTimers())
// Do not store b elements
// not meet a.proctime <= b.proctime - 5
testHarness.processElement2(new StreamRecord(
CRow(Row.of(1: JInt, "bbb3"), true), 3))
- assert(testHarness.numKeyedStateEntries() == 4)
- assert(testHarness.numProcessingTimeTimers() == 2)
+ assertEquals(4, testHarness.numKeyedStateEntries())
+ assertEquals(2, testHarness.numProcessingTimeTimers())
// meet a.proctime <= b.proctime - 5
testHarness.setProcessingTime(7)
testHarness.processElement2(new StreamRecord(
CRow(Row.of(2: JInt, "bbb7"), true), 7))
- assert(testHarness.numKeyedStateEntries() == 4)
- assert(testHarness.numProcessingTimeTimers() == 2)
+ assertEquals(4, testHarness.numKeyedStateEntries())
+ assertEquals(2, testHarness.numProcessingTimeTimers())
// expire record of stream a at timestamp 1
testHarness.setProcessingTime(12)
- assert(testHarness.numKeyedStateEntries() == 4)
- assert(testHarness.numProcessingTimeTimers() == 2)
+ assertEquals(4, testHarness.numKeyedStateEntries())
+ assertEquals(2, testHarness.numProcessingTimeTimers())
testHarness.processElement2(new StreamRecord(
CRow(Row.of(1: JInt, "bbb12"), true), 12))
testHarness.setProcessingTime(13)
- assert(testHarness.numKeyedStateEntries() == 2)
- assert(testHarness.numProcessingTimeTimers() == 1)
+ assertEquals(2, testHarness.numKeyedStateEntries())
+ assertEquals(1, testHarness.numProcessingTimeTimers())
- testHarness.setProcessingTime(14)
- assert(testHarness.numKeyedStateEntries() == 0)
- assert(testHarness.numProcessingTimeTimers() == 0)
+ // state must be cleaned after the window timer interval has passed without new rows.
+ testHarness.setProcessingTime(23)
+ assertEquals(0, testHarness.numKeyedStateEntries())
+ assertEquals(0, testHarness.numProcessingTimeTimers())
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
new file mode 100644
index 0000000..ab7925b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+ /** test process time inner join **/
+ @Test
+ def testProcessTimeInnerJoin(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
+ "t1.proctime between t2.proctime - interval '5' second and t2.proctime + interval '5' second"
+
+ val data1 = new mutable.MutableList[(Int, Long, String)]
+ data1.+=((1, 1L, "Hi1"))
+ data1.+=((1, 2L, "Hi2"))
+ data1.+=((1, 5L, "Hi3"))
+ data1.+=((2, 7L, "Hi5"))
+ data1.+=((1, 9L, "Hi6"))
+ data1.+=((1, 8L, "Hi8"))
+
+ val data2 = new mutable.MutableList[(Int, Long, String)]
+ data2.+=((1, 1L, "HiHi"))
+ data2.+=((2, 2L, "HeHe"))
+
+ val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+ val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ }
+
+ /** test process time inner join with other condition **/
+ @Test
+ def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
+ "t1.proctime between t2.proctime - interval '5' second " +
+ "and t2.proctime + interval '5' second " +
+ "and t1.b > t2.b and t1.b + t2.b < 14"
+
+ val data1 = new mutable.MutableList[(String, Long, String)]
+ data1.+=(("1", 1L, "Hi1"))
+ data1.+=(("1", 2L, "Hi2"))
+ data1.+=(("1", 5L, "Hi3"))
+ data1.+=(("2", 7L, "Hi5"))
+ data1.+=(("1", 9L, "Hi6"))
+ data1.+=(("1", 8L, "Hi8"))
+
+ val data2 = new mutable.MutableList[(String, Long, String)]
+ data2.+=(("1", 5L, "HiHi"))
+ data2.+=(("2", 2L, "HeHe"))
+
+ val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+ val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ }
+
+}
+