You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/07/18 07:40:08 UTC
[3/3] flink git commit: [FLINK-6232] [table] Add SQL documentation
for time window join.
[FLINK-6232] [table] Add SQL documentation for time window join.
- Add support for window join predicates in WHERE clause.
- Refactoring of WindowJoinUtil.
- Minor refactorings of join classes.
This closes #4324.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/471345c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/471345c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/471345c0
Branch: refs/heads/master
Commit: 471345c0ea9930a582786cc08bd290d374b28c5a
Parents: ba6c59e
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Jul 13 00:49:30 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Jul 18 00:57:37 2017 +0200
----------------------------------------------------------------------
docs/dev/table/sql.md | 29 ++
.../table/api/StreamTableEnvironment.scala | 10 +-
.../calcite/RelTimeIndicatorConverter.scala | 166 ++++----
.../flink/table/plan/nodes/CommonCalc.scala | 25 +-
.../table/plan/nodes/dataset/DataSetCalc.scala | 12 +-
.../table/plan/nodes/dataset/DataSetJoin.scala | 2 +-
.../plan/nodes/datastream/DataStreamCalc.scala | 25 +-
.../nodes/datastream/DataStreamWindowJoin.scala | 40 +-
.../datastream/DataStreamWindowJoinRule.scala | 55 ++-
.../table/plan/util/UpdatingPlanChecker.scala | 129 ++++++
.../runtime/join/ProcTimeWindowInnerJoin.scala | 134 +++---
.../table/runtime/join/WindowJoinUtil.scala | 408 ++++++++++++-------
.../table/updateutils/UpdateCheckUtils.scala | 128 ------
.../table/api/scala/stream/sql/JoinITCase.scala | 117 ------
.../table/api/scala/stream/sql/JoinTest.scala | 235 -----------
.../flink/table/api/stream/sql/JoinTest.scala | 250 ++++++++++++
.../sql/validation/JoinValidationTest.scala | 95 +++++
.../plan/TimeIndicatorConversionTest.scala | 2 +-
.../table/runtime/harness/JoinHarnessTest.scala | 61 ++-
.../table/runtime/stream/sql/JoinITCase.scala | 106 +++++
20 files changed, 1167 insertions(+), 862 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 81aa3d9..e677b60 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -369,6 +369,35 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
</td>
</tr>
<tr>
+ <td><strong>Time-windowed Join</strong><br>
+ <span class="label label-primary">Batch</span>
+ <span class="label label-primary">Streaming</span>
+ </td>
+ <td>
+ <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
+
+ <p>A time-windowed join requires a special join condition that bounds the time on both sides. This can be done by either two appropriate range predicates (<code> <, <=, >=, ></code>) or a <code>BETWEEN</code> predicate that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+ <ul>
+ <li>Time predicates must compare time attributes of both input tables.</li>
+ <li>Time predicates must compare only time attributes of the same type, i.e., processing time with processing time or event time with event time.</li>
+ <li>Only range predicates are valid time predicates.</li>
+ <li>Non-time predicates must not access a time attribute.</li>
+ </ul>
+ </p>
+
+ <p><b>Note:</b> Currently, only processing time window joins and <code>INNER</code> joins are supported.</p>
+
+{% highlight sql %}
+SELECT *
+FROM Orders o, Shipments s
+WHERE o.id = s.orderId AND
+ o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
+{% endhighlight %}
+
+The example above will join all orders with their corresponding shipments if the order was shipped four hours after the order was received.
+ </td>
+ </tr>
+ <tr>
<td>
<strong>Expanding arrays into a relation</strong><br>
<span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index f026824..669b017 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -45,12 +45,12 @@ import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait, _}
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable}
+import org.apache.flink.table.plan.util.UpdatingPlanChecker
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner}
import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.TypeCheckUtils
-import org.apache.flink.table.updateutils.UpdateCheckUtils
import org.apache.flink.types.Row
import _root_.scala.collection.JavaConverters._
@@ -174,10 +174,10 @@ abstract class StreamTableEnvironment(
// optimize plan
val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
// check for append only table
- val isAppendOnlyTable = UpdateCheckUtils.isAppendOnly(optimizedPlan)
+ val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(optimizedPlan)
upsertSink.setIsAppendOnly(isAppendOnlyTable)
// extract unique key fields
- val tableKeys: Option[Array[String]] = UpdateCheckUtils.getUniqueKeyFields(optimizedPlan)
+ val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan)
// check that we have keys if the table has changes (is not append-only)
tableKeys match {
case Some(keys) => upsertSink.setKeyFields(keys)
@@ -201,7 +201,7 @@ abstract class StreamTableEnvironment(
// optimize plan
val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
// verify table is an insert-only (append-only) table
- if (!UpdateCheckUtils.isAppendOnly(optimizedPlan)) {
+ if (!UpdatingPlanChecker.isAppendOnly(optimizedPlan)) {
throw new TableException(
"AppendStreamTableSink requires that Table has only insert changes.")
}
@@ -651,7 +651,7 @@ abstract class StreamTableEnvironment(
(implicit tpe: TypeInformation[A]): DataStream[A] = {
// if no change flags are requested, verify table is an insert-only (append-only) table.
- if (!withChangeFlag && !UpdateCheckUtils.isAppendOnly(logicalPlan)) {
+ if (!withChangeFlag && !UpdatingPlanChecker.isAppendOnly(logicalPlan)) {
throw new TableException(
"Table is not an append-only table. " +
"Use the toRetractStream() in order to handle add and retract messages.")
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 32d6f01..d76613e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -42,9 +42,9 @@ import scala.collection.mutable
class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
private val timestamp = rexBuilder
- .getTypeFactory
- .asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ .getTypeFactory
+ .asInstanceOf[FlinkTypeFactory]
+ .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
override def visit(intersect: LogicalIntersect): RelNode =
throw new TableException("Logical intersect in a stream environment is not supported yet.")
@@ -138,15 +138,10 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
// visit children and update inputs
val input = filter.getInput.accept(this)
- // check if input field contains time indicator type
- // materialize field if no time indicator is present anymore
- // if input field is already materialized, change to timestamp type
- val materializer = new RexTimeIndicatorMaterializer(
- rexBuilder,
- input.getRowType.getFieldList.map(_.getType))
-
- val condition = filter.getCondition.accept(materializer)
- LogicalFilter.create(input, condition)
+ // We do not materialize time indicators in conditions because they can be locally evaluated.
+ // Some conditions are evaluated by special operators (e.g., time window joins).
+ // Time indicators in remaining conditions are materialized by Calc before the code generation.
+ LogicalFilter.create(input, filter.getCondition)
}
override def visit(project: LogicalProject): RelNode = {
@@ -306,68 +301,6 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
updatedAggCalls)
}
- class RexTimeIndicatorMaterializer(
- private val rexBuilder: RexBuilder,
- private val input: Seq[RelDataType])
- extends RexShuttle {
-
- override def visitInputRef(inputRef: RexInputRef): RexNode = {
- // reference is interesting
- if (isTimeIndicatorType(inputRef.getType)) {
- val resolvedRefType = input(inputRef.getIndex)
- // input is a valid time indicator
- if (isTimeIndicatorType(resolvedRefType)) {
- inputRef
- }
- // input has been materialized
- else {
- new RexInputRef(inputRef.getIndex, resolvedRefType)
- }
- }
- // reference is a regular field
- else {
- super.visitInputRef(inputRef)
- }
- }
-
- override def visitCall(call: RexCall): RexNode = {
- val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
-
- // materialize operands with time indicators
- val materializedOperands = updatedCall.getOperator match {
-
- // skip materialization for special operators
- case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
- updatedCall.getOperands.toList
-
- case _ =>
- updatedCall.getOperands.map { o =>
- if (isTimeIndicatorType(o.getType)) {
- rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
- } else {
- o
- }
- }
- }
-
- // remove time indicator return type
- updatedCall.getOperator match {
-
- // we do not modify AS if operand has not been materialized
- case SqlStdOperatorTable.AS if
- isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
- updatedCall
-
- // materialize function's result and operands
- case _ if isTimeIndicatorType(updatedCall.getType) =>
- updatedCall.clone(timestamp, materializedOperands)
-
- // materialize function's operands only
- case _ =>
- updatedCall.clone(updatedCall.getType, materializedOperands)
- }
- }
- }
}
object RelTimeIndicatorConverter {
@@ -421,4 +354,89 @@ object RelTimeIndicatorConverter {
new RelRecordType(fields)
}
+
+ /**
+ * Materializes time indicator accesses in an expression.
+ *
+ * @param expr The expression in which time indicators are materialized.
+ * @param rowType The input schema of the expression.
+ * @param rexBuilder A RexBuilder.
+ *
+ * @return The expression with materialized time indicators.
+ */
+ def convertExpression(expr: RexNode, rowType: RelDataType, rexBuilder: RexBuilder): RexNode = {
+ val materializer = new RexTimeIndicatorMaterializer(
+ rexBuilder,
+ rowType.getFieldList.map(_.getType))
+
+ expr.accept(materializer)
+ }
+}
+
+class RexTimeIndicatorMaterializer(
+ private val rexBuilder: RexBuilder,
+ private val input: Seq[RelDataType])
+ extends RexShuttle {
+
+ private val timestamp = rexBuilder
+ .getTypeFactory
+ .asInstanceOf[FlinkTypeFactory]
+ .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ // reference is interesting
+ if (isTimeIndicatorType(inputRef.getType)) {
+ val resolvedRefType = input(inputRef.getIndex)
+ // input is a valid time indicator
+ if (isTimeIndicatorType(resolvedRefType)) {
+ inputRef
+ }
+ // input has been materialized
+ else {
+ new RexInputRef(inputRef.getIndex, resolvedRefType)
+ }
+ }
+ // reference is a regular field
+ else {
+ super.visitInputRef(inputRef)
+ }
+ }
+
+ override def visitCall(call: RexCall): RexNode = {
+ val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
+
+ // materialize operands with time indicators
+ val materializedOperands = updatedCall.getOperator match {
+
+ // skip materialization for special operators
+ case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
+ updatedCall.getOperands.toList
+
+ case _ =>
+ updatedCall.getOperands.map { o =>
+ if (isTimeIndicatorType(o.getType)) {
+ rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
+ } else {
+ o
+ }
+ }
+ }
+
+ // remove time indicator return type
+ updatedCall.getOperator match {
+
+ // we do not modify AS if operand has not been materialized
+ case SqlStdOperatorTable.AS if
+ isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
+ updatedCall
+
+ // materialize function's result and operands
+ case _ if isTimeIndicatorType(updatedCall.getType) =>
+ updatedCall.clone(timestamp, materializedOperands)
+
+ // materialize function's operands only
+ case _ =>
+ updatedCall.clone(updatedCall.getType, materializedOperands)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 693924e..3e355ff 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -22,12 +22,10 @@ import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
import org.apache.calcite.rex._
import org.apache.flink.api.common.functions.Function
import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.types.Row
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
trait CommonCalc {
@@ -37,39 +35,26 @@ trait CommonCalc {
ruleDescription: String,
inputSchema: RowSchema,
returnSchema: RowSchema,
- calcProgram: RexProgram,
+ calcProjection: Seq[RexNode],
+ calcCondition: Option[RexNode],
config: TableConfig,
functionClass: Class[T]):
GeneratedFunction[T, Row] = {
- val expandedExpressions = calcProgram
- .getProjectList
- .map(expr => calcProgram.expandLocalRef(expr))
- // time indicator fields must not be part of the code generation
- .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType))
- // update indices
- .map(expr => inputSchema.mapRexNode(expr))
-
- val condition = if (calcProgram.getCondition != null) {
- inputSchema.mapRexNode(calcProgram.expandLocalRef(calcProgram.getCondition))
- } else {
- null
- }
-
val projection = generator.generateResultExpression(
returnSchema.physicalTypeInfo,
returnSchema.physicalFieldNames,
- expandedExpressions)
+ calcProjection)
// only projection
- val body = if (condition == null) {
+ val body = if (calcCondition.isEmpty) {
s"""
|${projection.code}
|${generator.collectorTerm}.collect(${projection.resultTerm});
|""".stripMargin
}
else {
- val filterCondition = generator.generateExpression(condition)
+ val filterCondition = generator.generateExpression(calcCondition.get)
// only filter
if (projection == null) {
s"""
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index a923acc..7417eb2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -35,6 +35,8 @@ import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.FlatMapRunner
import org.apache.flink.types.Row
+import scala.collection.JavaConverters._
+
/**
* Flink RelNode which matches along with LogicalCalc.
*
@@ -90,12 +92,20 @@ class DataSetCalc(
val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+ val projection = calcProgram.getProjectList.asScala.map(calcProgram.expandLocalRef)
+ val condition = if (calcProgram.getCondition != null) {
+ Some(calcProgram.expandLocalRef(calcProgram.getCondition))
+ } else {
+ None
+ }
+
val genFunction = generateFunction(
generator,
ruleDescription,
new RowSchema(getInput.getRowType),
new RowSchema(getRowType),
- calcProgram,
+ projection,
+ condition,
config,
classOf[FlatMapFunction[Row, Row]])
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index e8f3b82..1583e31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -58,7 +58,7 @@ class DataSetJoin(
with CommonJoin
with DataSetRel {
- override def deriveRowType() = rowRelDataType
+ override def deriveRowType(): RelDataType = rowRelDataType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetJoin(
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index d626c46..2e00330 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -28,11 +28,14 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
import org.apache.flink.table.codegen.FunctionCodeGenerator
+import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
import org.apache.flink.table.plan.nodes.CommonCalc
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.CRowProcessRunner
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import scala.collection.JavaConverters._
+
/**
* Flink RelNode which matches along with FlatMapOperator.
*
@@ -93,6 +96,25 @@ class DataStreamCalc(
val inputDataStream =
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+ // materialize time attributes in condition
+ val condition = if (calcProgram.getCondition != null) {
+ val materializedCondition = RelTimeIndicatorConverter.convertExpression(
+ calcProgram.expandLocalRef(calcProgram.getCondition),
+ inputSchema.logicalType,
+ cluster.getRexBuilder)
+ Some(materializedCondition)
+ } else {
+ None
+ }
+
+ // filter out time attributes
+ val projection = calcProgram.getProjectList.asScala
+ .map(calcProgram.expandLocalRef)
+ // time indicator fields must not be part of the code generation
+ .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType))
+ // update indices
+ .map(expr => inputSchema.mapRexNode(expr))
+
val generator = new FunctionCodeGenerator(config, false, inputSchema.physicalTypeInfo)
val genFunction = generateFunction(
@@ -100,7 +122,8 @@ class DataStreamCalc(
ruleDescription,
inputSchema,
schema,
- calcProgram,
+ projection,
+ condition,
config,
classOf[ProcessFunction[CRow, CRow]])
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
index 1315a79..987947c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
import org.apache.calcite.rex.RexNode
@@ -27,12 +28,12 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
import org.apache.flink.table.plan.nodes.CommonJoin
import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.UpdatingPlanChecker
import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, WindowJoinUtil}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.table.updateutils.UpdateCheckUtils
/**
- * Flink RelNode which matches along with JoinOperator and its related operations.
+ * RelNode for a time windowed stream join.
*/
class DataStreamWindowJoin(
cluster: RelOptCluster,
@@ -53,7 +54,7 @@ class DataStreamWindowJoin(
with CommonJoin
with DataStreamRel {
- override def deriveRowType() = schema.logicalType
+ override def deriveRowType(): RelDataType = schema.logicalType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamWindowJoin(
@@ -96,8 +97,8 @@ class DataStreamWindowJoin(
val config = tableEnv.getConfig
- val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
- val isRightAppendOnly = UpdateCheckUtils.isAppendOnly(right)
+ val isLeftAppendOnly = UpdatingPlanChecker.isAppendOnly(left)
+ val isRightAppendOnly = UpdatingPlanChecker.isAppendOnly(right)
if (!isLeftAppendOnly || !isRightAppendOnly) {
throw new TableException(
"Windowed stream join does not support updates.")
@@ -124,21 +125,20 @@ class DataStreamWindowJoin(
joinType match {
case JoinRelType.INNER =>
- isRowTime match {
- case false =>
- // Proctime JoinCoProcessFunction
- createProcTimeInnerJoinFunction(
- leftDataStream,
- rightDataStream,
- joinFunction.name,
- joinFunction.code,
- leftKeys,
- rightKeys
- )
- case true =>
- // RowTime JoinCoProcessFunction
- throw new TableException(
- "RowTime inner join between stream and stream is not supported yet.")
+ if (isRowTime) {
+ // RowTime JoinCoProcessFunction
+ throw new TableException(
+ "RowTime inner join between stream and stream is not supported yet.")
+ } else {
+ // Proctime JoinCoProcessFunction
+ createProcTimeInnerJoinFunction(
+ leftDataStream,
+ rightDataStream,
+ joinFunction.name,
+ joinFunction.code,
+ leftKeys,
+ rightKeys
+ )
}
case JoinRelType.FULL =>
throw new TableException(
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
index c7a190f..2075689 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
@@ -21,13 +21,16 @@ package org.apache.flink.table.plan.rules.datastream
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamWindowJoin
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.join.WindowJoinUtil
+import scala.collection.JavaConverters._
+
class DataStreamWindowJoinRule
extends ConverterRule(
classOf[FlinkLogicalJoin],
@@ -39,18 +42,36 @@ class DataStreamWindowJoinRule
val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
val joinInfo = join.analyzeCondition
- try {
- WindowJoinUtil.analyzeTimeBoundary(
- joinInfo.getRemaining(join.getCluster.getRexBuilder),
- join.getLeft.getRowType.getFieldCount,
- new RowSchema(join.getRowType),
- join.getCluster.getRexBuilder,
- TableConfig.DEFAULT)
- true
- } catch {
- case _: TableException =>
+ val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate(
+ joinInfo.getRemaining(join.getCluster.getRexBuilder),
+ join.getLeft.getRowType.getFieldCount,
+ join.getRowType,
+ join.getCluster.getRexBuilder,
+ TableConfig.DEFAULT)
+
+ // remaining predicate must not access time attributes
+ val remainingPredsAccessTime = remainingPreds.isDefined &&
+ WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, join.getRowType)
+
+ if (windowBounds.isDefined) {
+ if (windowBounds.get.isEventTime) {
+ // we cannot handle event-time window joins yet
false
+ } else {
+ // Check that no event-time attributes are in the input.
+ // The proc-time join implementation does ensure that record timestamp are correctly set.
+ // It is always the timestamp of the later arriving record.
+ // We rely on projection pushdown to remove unused attributes before the join.
+ val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
+ .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+ !remainingPredsAccessTime && !rowTimeAttrInOutput
+ }
+ } else {
+ // the given join does not have valid window bounds. We cannot translate it.
+ false
}
+
}
override def convert(rel: RelNode): RelNode = {
@@ -63,11 +84,11 @@ class DataStreamWindowJoinRule
val leftRowSchema = new RowSchema(convLeft.getRowType)
val rightRowSchema = new RowSchema(convRight.getRowType)
- val (isRowTime, leftLowerBoundary, leftUpperBoundary, remainCondition) =
- WindowJoinUtil.analyzeTimeBoundary(
+ val (windowBounds, remainCondition) =
+ WindowJoinUtil.extractWindowBoundsFromPredicate(
joinInfo.getRemaining(join.getCluster.getRexBuilder),
leftRowSchema.logicalArity,
- new RowSchema(join.getRowType),
+ join.getRowType,
join.getCluster.getRexBuilder,
TableConfig.DEFAULT)
@@ -81,9 +102,9 @@ class DataStreamWindowJoinRule
leftRowSchema,
rightRowSchema,
new RowSchema(rel.getRowType),
- isRowTime,
- leftLowerBoundary,
- leftUpperBoundary,
+ windowBounds.get.isEventTime,
+ windowBounds.get.leftLowerBound,
+ windowBounds.get.leftUpperBound,
remainCondition,
description)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
new file mode 100644
index 0000000..6a160f6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rel.{RelNode, RelVisitor}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.datastream._
+
+import _root_.scala.collection.JavaConverters._
+
+object UpdatingPlanChecker {
+
+ /** Validates that the plan produces only append changes. */
+ def isAppendOnly(plan: RelNode): Boolean = {
+ val appendOnlyValidator = new AppendOnlyValidator
+ appendOnlyValidator.go(plan)
+
+ appendOnlyValidator.isAppendOnly
+ }
+
+ /** Extracts the unique keys of the table produced by the plan. */
+ def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
+ val keyExtractor = new UniqueKeyExtractor
+ keyExtractor.go(plan)
+ keyExtractor.keys
+ }
+
+ private class AppendOnlyValidator extends RelVisitor {
+
+ var isAppendOnly = true
+
+ override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+ node match {
+ case s: DataStreamRel if s.producesUpdates =>
+ isAppendOnly = false
+ case _ =>
+ super.visit(node, ordinal, parent)
+ }
+ }
+ }
+
+ /** Identifies unique key fields in the output of a RelNode. */
+ private class UniqueKeyExtractor extends RelVisitor {
+
+ var keys: Option[Array[String]] = None
+
+ override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+ node match {
+ case c: DataStreamCalc =>
+ super.visit(node, ordinal, parent)
+ // check if input has keys
+ if (keys.isDefined) {
+ // track keys forward
+ val inNames = c.getInput.getRowType.getFieldNames
+ val inOutNames = c.getProgram.getNamedProjects.asScala
+ .map(p => {
+ c.getProgram.expandLocalRef(p.left) match {
+ // output field is forwarded input field
+ case i: RexInputRef => (i.getIndex, p.right)
+ // output field is renamed input field
+ case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+ a.getOperands.get(0) match {
+ case ref: RexInputRef =>
+ (ref.getIndex, p.right)
+ case _ =>
+ (-1, p.right)
+ }
+ // output field is not forwarded from input
+ case _: RexNode => (-1, p.right)
+ }
+ })
+ // filter all non-forwarded fields
+ .filter(_._1 >= 0)
+ // resolve names of input fields
+ .map(io => (inNames.get(io._1), io._2))
+
+ // filter by input keys
+ val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
+ // check if all keys have been preserved
+ if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
+ // all key have been preserved (but possibly renamed)
+ keys = Some(outKeys.toArray)
+ } else {
+ // some (or all) keys have been removed. Keys are no longer unique and removed
+ keys = None
+ }
+ }
+ case _: DataStreamOverAggregate =>
+ super.visit(node, ordinal, parent)
+ // keys are always forwarded by Over aggregate
+ case a: DataStreamGroupAggregate =>
+ // get grouping keys
+ val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
+ keys = Some(groupKeys.toArray)
+ case w: DataStreamGroupWindowAggregate =>
+ // get grouping keys
+ val groupKeys =
+ w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
+ // get window start and end time
+ val windowStartEnd = w.getWindowProperties.map(_.name)
+ // we have only a unique key if at least one window property is selected
+ if (windowStartEnd.nonEmpty) {
+ keys = Some(groupKeys ++ windowStartEnd)
+ }
+ case _: DataStreamRel =>
+ // anything else does not forward keys or might duplicate key, so we can stop
+ keys = None
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
index 97d5ccc..8a3ba43 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
@@ -59,24 +59,26 @@ class ProcTimeWindowInnerJoin(
private var cRowWrapper: CRowWrappingCollector = _
- /** other condition function **/
+ // other condition function
private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
- /** tmp list to store expired records **/
- private var listToRemove: JList[Long] = _
+ // tmp list to store expired records
+ private var removeList: JList[Long] = _
- /** state to hold left stream element **/
+ // state to hold left stream element
private var row1MapState: MapState[Long, JList[Row]] = _
- /** state to hold right stream element **/
+ // state to hold right stream element
private var row2MapState: MapState[Long, JList[Row]] = _
- /** state to record last timer of left stream, 0 means no timer **/
+ // state to record last timer of left stream, 0 means no timer
private var timerState1: ValueState[Long] = _
- /** state to record last timer of right stream, 0 means no timer **/
+ // state to record last timer of right stream, 0 means no timer
private var timerState2: ValueState[Long] = _
- private val leftStreamWinSize: Long = if (leftLowerBound < 0) -leftLowerBound else 0
- private val rightStreamWinSize: Long = if (leftUpperBound > 0) leftUpperBound else 0
+ // compute window sizes, i.e., how long to keep rows in state.
+ // window size of -1 means rows do not need to be put into state.
+ private val leftStreamWinSize: Long = if (leftLowerBound <= 0) -leftLowerBound else -1
+ private val rightStreamWinSize: Long = if (leftUpperBound >= 0) leftUpperBound else -1
val LOG = LoggerFactory.getLogger(this.getClass)
@@ -90,7 +92,7 @@ class ProcTimeWindowInnerJoin(
LOG.debug("Instantiating JoinFunction.")
joinFunction = clazz.newInstance()
- listToRemove = new util.ArrayList[Long]()
+ removeList = new util.ArrayList[Long]()
cRowWrapper = new CRowWrappingCollector()
cRowWrapper.setChange(true)
@@ -140,7 +142,7 @@ class ProcTimeWindowInnerJoin(
row2MapState,
-leftUpperBound, // right stream lower
-leftLowerBound, // right stream upper
- true
+ isLeft = true
)
}
@@ -165,9 +167,9 @@ class ProcTimeWindowInnerJoin(
timerState2,
row2MapState,
row1MapState,
- leftLowerBound, // left stream upper
+ leftLowerBound, // left stream lower
leftUpperBound, // left stream upper
- false
+ isLeft = false
)
}
@@ -217,66 +219,82 @@ class ProcTimeWindowInnerJoin(
winSize: Long,
timerState: ValueState[Long],
rowMapState: MapState[Long, JList[Row]],
- oppoRowMapState: MapState[Long, JList[Row]],
- oppoLowerBound: Long,
- oppoUpperBound: Long,
+ otherRowMapState: MapState[Long, JList[Row]],
+ otherLowerBound: Long,
+ otherUpperBound: Long,
isLeft: Boolean): Unit = {
cRowWrapper.out = out
- val value = valueC.row
+ val row = valueC.row
val curProcessTime = ctx.timerService.currentProcessingTime
- val oppoLowerTime = curProcessTime + oppoLowerBound
- val oppoUpperTime = curProcessTime + oppoUpperBound
-
- // only when windowsize != 0, we need to store the element
- if (winSize != 0) {
- // register a timer to expire the element
- if (timerState.value == 0) {
- ctx.timerService.registerProcessingTimeTimer(curProcessTime + winSize + 1)
- timerState.update(curProcessTime + winSize + 1)
- }
+ val otherLowerTime = curProcessTime + otherLowerBound
+ val otherUpperTime = curProcessTime + otherUpperBound
+ if (winSize >= 0) {
+ // put row into state for later joining.
+ // (winSize == 0) joins rows received in the same millisecond.
var rowList = rowMapState.get(curProcessTime)
if (rowList == null) {
rowList = new util.ArrayList[Row]()
}
- rowList.add(value)
+ rowList.add(row)
rowMapState.put(curProcessTime, rowList)
+ // register a timer to remove the row from state once it is expired
+ if (timerState.value == 0) {
+ val cleanupTime = curProcessTime + winSize + 1
+ ctx.timerService.registerProcessingTimeTimer(cleanupTime)
+ timerState.update(cleanupTime)
+ }
}
- // loop the other stream elements
- val oppositeKeyIter = oppoRowMapState.keys().iterator()
- while (oppositeKeyIter.hasNext) {
- val eleTime = oppositeKeyIter.next()
- if (eleTime < oppoLowerTime) {
- listToRemove.add(eleTime)
- } else if (eleTime <= oppoUpperTime) {
- val oppoRowList = oppoRowMapState.get(eleTime)
- var i = 0
- if (isLeft) {
- while (i < oppoRowList.size) {
- joinFunction.join(value, oppoRowList.get(i), cRowWrapper)
+ // join row with rows received from the other input
+ val otherTimeIter = otherRowMapState.keys().iterator()
+ if (isLeft) {
+ // go over all timestamps in the other input's state
+ while (otherTimeIter.hasNext) {
+ val otherTimestamp = otherTimeIter.next()
+ if (otherTimestamp < otherLowerTime) {
+ // other timestamp is expired. Remove it later.
+ removeList.add(otherTimestamp)
+ } else if (otherTimestamp <= otherUpperTime) {
+ // join row with all rows from the other input for this timestamp
+ val otherRows = otherRowMapState.get(otherTimestamp)
+ var i = 0
+ while (i < otherRows.size) {
+ joinFunction.join(row, otherRows.get(i), cRowWrapper)
i += 1
}
- } else {
- while (i < oppoRowList.size) {
- joinFunction.join(oppoRowList.get(i), value, cRowWrapper)
+ }
+ }
+ } else {
+ // go over all timestamps in the other input's state
+ while (otherTimeIter.hasNext) {
+ val otherTimestamp = otherTimeIter.next()
+ if (otherTimestamp < otherLowerTime) {
+ // other timestamp is expired. Remove it later.
+ removeList.add(otherTimestamp)
+ } else if (otherTimestamp <= otherUpperTime) {
+ // join row with all rows from the other input for this timestamp
+ val otherRows = otherRowMapState.get(otherTimestamp)
+ var i = 0
+ while (i < otherRows.size) {
+ joinFunction.join(otherRows.get(i), row, cRowWrapper)
i += 1
}
}
}
}
- // expire records out-of-time
- var i = listToRemove.size - 1
+ // remove rows for expired timestamps
+ var i = removeList.size - 1
while (i >= 0) {
- oppoRowMapState.remove(listToRemove.get(i))
+ otherRowMapState.remove(removeList.get(i))
i -= 1
}
- listToRemove.clear()
+ removeList.clear()
}
/**
@@ -292,32 +310,34 @@ class ProcTimeWindowInnerJoin(
val expiredTime = curTime - winSize
val keyIter = rowMapState.keys().iterator()
- var nextTimer: Long = 0
+ var validTimestamp: Boolean = false
// Search for expired timestamps.
// If we find a non-expired timestamp, remember the timestamp and leave the loop.
// This way we find all expired timestamps if they are sorted without doing a full pass.
- while (keyIter.hasNext && nextTimer == 0) {
+ while (keyIter.hasNext && !validTimestamp) {
val recordTime = keyIter.next
if (recordTime < expiredTime) {
- listToRemove.add(recordTime)
+ removeList.add(recordTime)
} else {
- nextTimer = recordTime
+ // we found a timestamp that is still valid
+ validTimestamp = true
}
}
// Remove expired records from state
- var i = listToRemove.size - 1
+ var i = removeList.size - 1
while (i >= 0) {
- rowMapState.remove(listToRemove.get(i))
+ rowMapState.remove(removeList.get(i))
i -= 1
}
- listToRemove.clear()
+ removeList.clear()
// If the state has non-expired timestamps, register a new timer.
// Otherwise clean the complete state for this input.
- if (nextTimer != 0) {
- ctx.timerService.registerProcessingTimeTimer(nextTimer + winSize + 1)
- timerState.update(nextTimer + winSize + 1)
+ if (validTimestamp) {
+ val cleanupTime = curTime + winSize + 1
+ ctx.timerService.registerProcessingTimeTimer(cleanupTime)
+ timerState.update(cleanupTime)
} else {
timerState.clear()
rowMapState.clear()
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index fabeeba..c38d915 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -17,7 +17,6 @@
*/
package org.apache.flink.table.runtime.join
-import java.math.{BigDecimal => JBigDecimal}
import java.util
import org.apache.calcite.plan.RelOptUtil
@@ -27,263 +26,362 @@ import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlKind
import org.apache.flink.api.common.functions.FlatJoinFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.plan.schema.{RowSchema, TimeIndicatorRelDataType}
import org.apache.flink.types.Row
import scala.collection.JavaConverters._
/**
- * An util class to help analyze and build join code .
+ * An util class to optimize and generate window joins.
*/
object WindowJoinUtil {
+ case class WindowBounds(isEventTime: Boolean, leftLowerBound: Long, leftUpperBound: Long)
+
+ protected case class WindowBound(bound: Long, isLeftLower: Boolean)
+ protected case class TimePredicate(
+ isEventTime: Boolean,
+ leftInputOnLeftSide: Boolean,
+ pred: RexCall)
+ protected case class TimeAttributeAccess(isEventTime: Boolean, isLeftInput: Boolean)
+
/**
- * Analyze time-condtion to get time boundary for each stream and get the time type
- * and return remain condition.
+ * Extracts the window bounds from a join predicate.
+ *
+ * @param predicate join predicate
+ * @param leftLogicalFieldCnt number of attributes on the left join input
+ * @param inputSchema schema of the join result
+ * @param rexBuilder RexBuilder
+ * @param config TableConfig
*
- * @param condition join condition
- * @param leftLogicalFieldCnt left stream logical field num
- * @param inputSchema join rowtype schema
- * @param rexBuilder util to build rexNode
- * @param config table environment config
- * @return isRowTime, left lower boundary, right lower boundary, remain condition
+ * @return A Tuple2 of extracted window bounds and remaining predicates.
*/
- private[flink] def analyzeTimeBoundary(
- condition: RexNode,
+ private[flink] def extractWindowBoundsFromPredicate(
+ predicate: RexNode,
leftLogicalFieldCnt: Int,
- inputSchema: RowSchema,
+ inputSchema: RelDataType,
rexBuilder: RexBuilder,
- config: TableConfig): (Boolean, Long, Long, Option[RexNode]) = {
+ config: TableConfig): (Option[WindowBounds], Option[RexNode]) = {
// Converts the condition to conjunctive normal form (CNF)
- val cnfCondition = RexUtil.toCnf(rexBuilder, condition)
+ val cnfCondition = RexUtil.toCnf(rexBuilder, predicate)
// split the condition into time indicator condition and other condition
- val (timeTerms, remainTerms) = cnfCondition match {
- case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
- c.getOperands.asScala
- .map(analyzeCondtionTermType(_, leftLogicalFieldCnt, inputSchema.logicalType))
- .reduceLeft((l, r) => {
- (l._1 ++ r._1, l._2 ++ r._2)
- })
- case _ =>
- throw new TableException("A time-based stream join requires exactly " +
- "two join predicates that bound the time in both directions.")
+ val (timePreds, otherPreds) = cnfCondition match {
+ // We need at least two comparison predicates for a properly bounded window join.
+ // So we need an AND expression for a valid window join.
+ case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+ c.getOperands.asScala
+ .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema))
+ .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => {
+ analyzed match {
+ case Left(timePred) => (preds._1 :+ timePred, preds._2)
+ case Right(otherPred) => (preds._1, preds._2 :+ otherPred)
+ }
+ })
+ case _ =>
+ // No valid window bounds. A windowed stream join requires two comparison predicates that
+ // bound the time in both directions.
+ return (None, Some(predicate))
}
- if (timeTerms.size != 2) {
- throw new TableException("A time-based stream join requires exactly " +
- "two join predicates that bound the time in both directions.")
+ if (timePreds.size != 2) {
+ // No valid window bounds. A windowed stream join requires two comparison predicates that
+ // bound the time in both directions.
+ return (None, Some(predicate))
}
- // extract time offset from the time indicator conditon
- val streamTimeOffsets =
- timeTerms.map(x => extractTimeOffsetFromCondition(x._3, x._2, rexBuilder, config))
-
+ // assemble window bounds from predicates
+ val streamTimeOffsets = timePreds.map(computeWindowBoundFromPredicate(_, rexBuilder, config))
val (leftLowerBound, leftUpperBound) =
streamTimeOffsets match {
- case Seq((x, true), (y, false)) => (x, y)
- case Seq((x, false), (y, true)) => (y, x)
+ case Seq(Some(x: WindowBound), Some(y: WindowBound)) if x.isLeftLower && !y.isLeftLower =>
+ (x.bound, y.bound)
+ case Seq(Some(x: WindowBound), Some(y: WindowBound)) if !x.isLeftLower && y.isLeftLower =>
+ (y.bound, x.bound)
case _ =>
- throw new TableException(
- "Time-based join conditions must reference the time attribute of both input tables.")
+ // Window join requires two comparison predicate that bound the time in both directions.
+ return (None, Some(predicate))
}
// compose the remain condition list into one condition
val remainCondition =
- remainTerms match {
- case Seq() => None
+ otherPreds match {
+ case Seq() =>
+ None
case _ =>
- // Converts logical field references to physical ones.
- Some(remainTerms.map(inputSchema.mapRexNode).reduceLeft((l, r) => {
- RelOptUtil.andJoinFilters(rexBuilder, l, r)
- }))
+ Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r)))
}
- val isRowTime: Boolean = timeTerms(0)._1 match {
- case x if FlinkTypeFactory.isProctimeIndicatorType(x) => false
- case _ => true
- }
- (isRowTime, leftLowerBound, leftUpperBound, remainCondition)
+ val bounds = Some(WindowBounds(timePreds.head.isEventTime, leftLowerBound, leftUpperBound))
+
+ (bounds, remainCondition)
}
/**
- * Split the join conditions into time condition and non-time condition
+ * Analyzes a predicate and identifies whether it is a valid predicate for a window join.
+ * A valid window join predicate is a comparison predicate (<, <=, =>, >) that accesses
+ * time attributes of both inputs, each input on a different side of the condition.
+ * Both accessed time attributes must be of the same time type, i.e., row-time or proc-time.
+ *
+ * Examples:
+ * - left.rowtime > right.rowtime + 2.minutes => valid
+ * - left.proctime < right.rowtime + 2.minutes => invalid: different time type
+ * - left.rowtime == right.rowtime + 2.minutes => invalid: not a comparison predicate
+ * - left.rowtime - right.rowtime < 2.minutes => invalid: both time attributes on same side
*
- * @return (Seq(timeTerms), Seq(remainTerms)),
+ * If the predicate is a regular join predicate, i.e., it accesses no time attribute it is
+ * returned as well.
+ *
+ * @return Either a valid time predicate (Left) or a valid non-time predicate (Right)
*/
- private def analyzeCondtionTermType(
- conditionTerm: RexNode,
+ private def identifyTimePredicate(
+ pred: RexNode,
leftFieldCount: Int,
- inputType: RelDataType): (Seq[(RelDataType, Boolean, RexNode)], Seq[RexNode]) = {
-
- conditionTerm match {
- case c: RexCall if Seq(SqlKind.GREATER_THAN, SqlKind.GREATER_THAN_OR_EQUAL,
- SqlKind.LESS_THAN, SqlKind.LESS_THAN_OR_EQUAL).contains(c.getKind) =>
- val timeIndicators = extractTimeIndicatorAccesses(c, leftFieldCount, inputType)
- timeIndicators match {
- case Seq() =>
- (Seq(), Seq(c))
- case Seq(v1, v2) =>
- if (v1._1 != v2._1) {
- throw new TableException(
- "Both time attributes in a join condition must be of the same type.")
+ inputType: RelDataType): Either[TimePredicate, RexNode] = {
+
+ pred match {
+ case c: RexCall =>
+ c.getKind match {
+ case SqlKind.GREATER_THAN |
+ SqlKind.GREATER_THAN_OR_EQUAL |
+ SqlKind.LESS_THAN |
+ SqlKind.LESS_THAN_OR_EQUAL =>
+
+ val leftTerm = c.getOperands.get(0)
+ val rightTerm = c.getOperands.get(1)
+
+ // validate that both sides of the condition do not access non-time attributes
+ if (accessesNonTimeAttribute(leftTerm, inputType) ||
+ accessesNonTimeAttribute(rightTerm, inputType)) {
+ return Right(pred)
}
- if (v1._2 == v2._2) {
- throw new TableException("Time-based join conditions " +
- "must reference the time attribute of both input tables.")
+
+ // get time attribute on left side of comparison
+ val leftTimeAttrAccess =
+ extractTimeAttributeAccesses(leftTerm, leftFieldCount, inputType) match {
+ case Seq() => None
+ case Seq(a) => Some(a)
+ case _ =>
+ // Window join predicate may only access a single time attribute on each side.
+ return Right(pred)
+ }
+
+ // get time attribute on right side of comparison
+ val rightTimeAccess =
+ extractTimeAttributeAccesses(rightTerm, leftFieldCount, inputType) match {
+ case Seq() => None
+ case Seq(a) => Some(a)
+ case _ =>
+ // Window join predicate may only access a single time attribute on each side.
+ return Right(pred)
+ }
+
+ // check if both sides of the condition access a time attribute,
+ // if both join inputs are accessed, and
+ // if both accesses are on the same time type (event-time or proc-time)
+ (leftTimeAttrAccess, rightTimeAccess) match {
+ case (None, None) =>
+ // neither left or right accesses a time attribute. This is a regular join predicate
+ Right(c)
+ case (Some(_), None) | (None, Some(_)) =>
+ // Both sides or a window join predicate must access a time attribute.
+ Right(pred)
+ case (Some(left), Some(right)) if left.isEventTime != right.isEventTime =>
+ // Both time attributes in a window join predicate must be of the same type.
+ Right(pred)
+ case (Some(left), Some(right)) if left.isLeftInput == right.isLeftInput =>
+ // Window join predicates must reference the time attribute of both inputs.
+ Right(pred)
+ case (Some(left), Some(_)) =>
+ Left(TimePredicate(left.isEventTime, left.isLeftInput, c))
}
- (Seq((v1._1, v1._2, c)), Seq())
- case _ =>
- throw new TableException(
- "Time-based join conditions must reference the time attribute of both input tables.")
+ // not a comparison predicate.
+ case _ => Right(pred)
}
case other =>
- val timeIndicators = extractTimeIndicatorAccesses(other, leftFieldCount, inputType)
- timeIndicators match {
- case Seq() =>
- (Seq(), Seq(other))
- case _ =>
- throw new TableException("Time indicators can not be used in non time-condition.")
- }
+ Right(other)
}
}
/**
- * Extracts all time indicator attributes that are accessed in an expression.
+ * Extracts all time attributes that are accessed in an expression.
*
- * @return seq(timeType, is left input time indicator)
+ * @return A Seq of all time attribute accessed in the expression.
*/
- def extractTimeIndicatorAccesses(
- expression: RexNode,
+ def extractTimeAttributeAccesses(
+ expr: RexNode,
leftFieldCount: Int,
- inputType: RelDataType): Seq[(RelDataType, Boolean)] = {
+ inputType: RelDataType): Seq[TimeAttributeAccess] = {
- expression match {
+ expr match {
case i: RexInputRef =>
+ // check if time attribute is accessed and from which input side
val idx = i.getIndex
inputType.getFieldList.get(idx).getType match {
- case t: TimeIndicatorRelDataType if idx < leftFieldCount =>
- // left table time indicator
- Seq((t, true))
case t: TimeIndicatorRelDataType =>
- // right table time indicator
- Seq((t, false))
- case _ => Seq()
+ // time attribute access. Remember time type and side of input
+ val isLeftInput = idx < leftFieldCount
+ Seq(TimeAttributeAccess(t.isEventTime, isLeftInput))
+ case _ =>
+ // not a time attribute access.
+ Seq()
}
case c: RexCall =>
+ // concat time-attributes of all operands
c.operands.asScala
- .map(extractTimeIndicatorAccesses(_, leftFieldCount, inputType))
+ .map(extractTimeAttributeAccesses(_, leftFieldCount, inputType))
.reduce(_ ++ _)
case _ => Seq()
}
}
/**
+ * Checks if an expression accesses a time attribute.
+ *
+ * @param expr The expression to check.
+ * @param inputType The input type of the expression.
+ * @return True, if the expression accesses a time attribute. False otherwise.
+ */
+ def accessesTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
+ expr match {
+ case i: RexInputRef =>
+ val accessedType = inputType.getFieldList.get(i.getIndex).getType
+ accessedType match {
+ case _: TimeIndicatorRelDataType => true
+ case _ => false
+ }
+ case c: RexCall =>
+ c.operands.asScala.exists(accessesTimeAttribute(_, inputType))
+ case _ => false
+ }
+ }
+
+ /**
+ * Checks if an expression accesses a non-time attribute.
+ *
+ * @param expr The expression to check.
+ * @param inputType The input type of the expression.
+ * @return True, if the expression accesses a non-time attribute. False otherwise.
+ */
+ def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
+ expr match {
+ case i: RexInputRef =>
+ val accessedType = inputType.getFieldList.get(i.getIndex).getType
+ accessedType match {
+ case _: TimeIndicatorRelDataType => false
+ case _ => true
+ }
+ case c: RexCall =>
+ c.operands.asScala.exists(accessesNonTimeAttribute(_, inputType))
+ case _ => false
+ }
+ }
+
+ /**
* Computes the absolute bound on the left operand of a comparison expression and
* whether the bound is an upper or lower bound.
*
* @return window boundary, is left lower bound
*/
- def extractTimeOffsetFromCondition(
- timeTerm: RexNode,
- isLeftExprBelongLeftTable: Boolean,
+ def computeWindowBoundFromPredicate(
+ timePred: TimePredicate,
rexBuilder: RexBuilder,
- config: TableConfig): (Long, Boolean) = {
-
- val timeCall: RexCall = timeTerm.asInstanceOf[RexCall]
+ config: TableConfig): Option[WindowBound] = {
val isLeftLowerBound: Boolean =
- timeTerm.getKind match {
- // e.g a.proctime > b.proctime - 5 sec, then it's the lower bound of a and the value is -5
- // e.g b.proctime > a.proctime - 5 sec, then it's not the lower bound of a but upper bound
- case kind@(SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL) =>
- isLeftExprBelongLeftTable
- // e.g a.proctime < b.proctime + 5 sec, the the upper bound of a is 5
- case kind@(SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
- !isLeftExprBelongLeftTable
- case _ =>
- throw new TableException("Unsupported time-condition.")
+ timePred.pred.getKind match {
+ case (SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL) =>
+ timePred.leftInputOnLeftSide
+ case (SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
+ !timePred.leftInputOnLeftSide
}
- val (leftLiteral, rightLiteral) =
- reduceTimeExpression(
- timeCall.operands.get(0),
- timeCall.operands.get(1),
- rexBuilder,
- config)
- val tmpTimeOffset: Long =
- if (isLeftExprBelongLeftTable) rightLiteral - leftLiteral else leftLiteral - rightLiteral
-
- val boundary =
- tmpTimeOffset.signum * (
- if (timeTerm.getKind == SqlKind.LESS_THAN || timeTerm.getKind == SqlKind.GREATER_THAN) {
- tmpTimeOffset.abs - 1
- } else {
- tmpTimeOffset.abs
- })
-
- (boundary, isLeftLowerBound)
+ // reduce predicate to constants to compute bounds
+ val (leftLiteral, rightLiteral) = reduceTimeExpression(timePred, rexBuilder, config)
+
+ if (leftLiteral.isEmpty || rightLiteral.isEmpty) {
+ return None
+ }
+
+ // compute boundary
+ val tmpTimeOffset: Long = if (timePred.leftInputOnLeftSide) {
+ rightLiteral.get - leftLiteral.get
+ } else {
+ leftLiteral.get - rightLiteral.get
+ }
+ val boundary = timePred.pred.getKind match {
+ case SqlKind.LESS_THAN =>
+ tmpTimeOffset - 1
+ case SqlKind.GREATER_THAN =>
+ tmpTimeOffset + 1
+ case _ =>
+ tmpTimeOffset
+ }
+
+ Some(WindowBound(boundary, isLeftLowerBound))
}
/**
- * Calculates the time boundary by replacing the time attribute by a zero literal
- * and reducing the expression.
- * For example:
- * b.proctime - interval '1' second - interval '2' second will be translated to
- * 0 - 1000 - 2000
+ * Replaces the time attributes on both sides of a time predicate by a zero literal and
+ * reduces the expressions on both sides to a long literal.
+ *
+ * @param timePred The time predicate which both sides are reduced.
+ * @param rexBuilder A RexBuilder
+ * @param config A TableConfig.
+ * @return The values of the reduced literals on both sides of the time comparison predicate.
*/
private def reduceTimeExpression(
- leftRexNode: RexNode,
- rightRexNode: RexNode,
+ timePred: TimePredicate,
rexBuilder: RexBuilder,
- config: TableConfig): (Long, Long) = {
+ config: TableConfig): (Option[Long], Option[Long]) = {
/**
- * replace the rowtime/proctime with zero literal.
+ * Replace the time attribute by zero literal.
*/
def replaceTimeFieldWithLiteral(expr: RexNode): RexNode = {
expr match {
case c: RexCall =>
// replace in call operands
- val newOps = c.operands.asScala.map(replaceTimeFieldWithLiteral(_)).asJava
+ val newOps = c.operands.asScala.map(replaceTimeFieldWithLiteral).asJava
rexBuilder.makeCall(c.getType, c.getOperator, newOps)
case i: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(i.getType) =>
// replace with timestamp
rexBuilder.makeZeroLiteral(expr.getType)
- case _: RexInputRef =>
- throw new TableException("Time join condition may only reference time indicator fields.")
case _ => expr
}
}
- val literalLeftRex = replaceTimeFieldWithLiteral(leftRexNode)
- val literalRightRex = replaceTimeFieldWithLiteral(rightRexNode)
+ val leftSide = timePred.pred.operands.get(0)
+ val rightSide = timePred.pred.operands.get(1)
+ val leftSideWithLiteral = replaceTimeFieldWithLiteral(leftSide)
+ val rightSideWithLiteral = replaceTimeFieldWithLiteral(rightSide)
+
+ // reduce expression to literal
val exprReducer = new ExpressionReducer(config)
val originList = new util.ArrayList[RexNode]()
- originList.add(literalLeftRex)
- originList.add(literalRightRex)
+ originList.add(leftSideWithLiteral)
+ originList.add(rightSideWithLiteral)
val reduceList = new util.ArrayList[RexNode]()
exprReducer.reduce(rexBuilder, originList, reduceList)
- val literals = reduceList.asScala.map(f => f match {
+ // extract bounds from reduced literal
+ val literals = reduceList.asScala.map {
case literal: RexLiteral =>
- literal.getValue2.asInstanceOf[Long]
+ Some(literal.getValue2.asInstanceOf[Long])
case _ =>
- throw TableException(
- "Time condition may only consist of time attributes, literals, and arithmetic operators.")
- })
+ None
+ }
- (literals(0), literals(1))
+ (literals.head, literals(1))
}
-
/**
- * Generate other non-equi condition function
+ * Generates a JoinFunction that applies additional join predicates and projects the result.
*
* @param config table env config
* @param joinType join type to determain whether input can be null
@@ -300,7 +398,7 @@ object WindowJoinUtil {
rightType: TypeInformation[Row],
returnType: RowSchema,
otherCondition: Option[RexNode],
- ruleDescription: String) = {
+ ruleDescription: String): GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row] = {
// whether input can be null
val nullCheck = joinType match {
@@ -311,7 +409,7 @@ object WindowJoinUtil {
}
// generate other non-equi function code
- val generator = new CodeGenerator(
+ val generator = new FunctionCodeGenerator(
config,
nullCheck,
leftType,
@@ -329,7 +427,9 @@ object WindowJoinUtil {
|${generator.collectorTerm}.collect(${conversion.resultTerm});
|""".stripMargin
case Some(remainCondition) =>
- val genCond = generator.generateExpression(remainCondition)
+ // map logical field accesses to physical accesses
+ val physicalCondition = returnType.mapRexNode(remainCondition)
+ val genCond = generator.generateExpression(physicalCondition)
s"""
|${genCond.code}
|if (${genCond.resultTerm}) {
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
deleted file mode 100644
index 769ba55..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.updateutils
-
-import org.apache.calcite.rel.{RelNode, RelVisitor}
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SqlKind
-import org.apache.flink.table.plan.nodes.datastream._
-import _root_.scala.collection.JavaConverters._
-
-object UpdateCheckUtils {
-
- /** Validates that the plan produces only append changes. */
- def isAppendOnly(plan: RelNode): Boolean = {
- val appendOnlyValidator = new AppendOnlyValidator
- appendOnlyValidator.go(plan)
-
- appendOnlyValidator.isAppendOnly
- }
-
- /** Extracts the unique keys of the table produced by the plan. */
- def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
- val keyExtractor = new UniqueKeyExtractor
- keyExtractor.go(plan)
- keyExtractor.keys
- }
-
- private class AppendOnlyValidator extends RelVisitor {
-
- var isAppendOnly = true
-
- override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
- node match {
- case s: DataStreamRel if s.producesUpdates =>
- isAppendOnly = false
- case _ =>
- super.visit(node, ordinal, parent)
- }
- }
- }
-
- /** Identifies unique key fields in the output of a RelNode. */
- private class UniqueKeyExtractor extends RelVisitor {
-
- var keys: Option[Array[String]] = None
-
- override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
- node match {
- case c: DataStreamCalc =>
- super.visit(node, ordinal, parent)
- // check if input has keys
- if (keys.isDefined) {
- // track keys forward
- val inNames = c.getInput.getRowType.getFieldNames
- val inOutNames = c.getProgram.getNamedProjects.asScala
- .map(p => {
- c.getProgram.expandLocalRef(p.left) match {
- // output field is forwarded input field
- case i: RexInputRef => (i.getIndex, p.right)
- // output field is renamed input field
- case a: RexCall if a.getKind.equals(SqlKind.AS) =>
- a.getOperands.get(0) match {
- case ref: RexInputRef =>
- (ref.getIndex, p.right)
- case _ =>
- (-1, p.right)
- }
- // output field is not forwarded from input
- case _: RexNode => (-1, p.right)
- }
- })
- // filter all non-forwarded fields
- .filter(_._1 >= 0)
- // resolve names of input fields
- .map(io => (inNames.get(io._1), io._2))
-
- // filter by input keys
- val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
- // check if all keys have been preserved
- if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
- // all key have been preserved (but possibly renamed)
- keys = Some(outKeys.toArray)
- } else {
- // some (or all) keys have been removed. Keys are no longer unique and removed
- keys = None
- }
- }
- case _: DataStreamOverAggregate =>
- super.visit(node, ordinal, parent)
- // keys are always forwarded by Over aggregate
- case a: DataStreamGroupAggregate =>
- // get grouping keys
- val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
- keys = Some(groupKeys.toArray)
- case w: DataStreamGroupWindowAggregate =>
- // get grouping keys
- val groupKeys =
- w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
- // get window start and end time
- val windowStartEnd = w.getWindowProperties.map(_.name)
- // we have only a unique key if at least one window property is selected
- if (windowStartEnd.nonEmpty) {
- keys = Some(groupKeys ++ windowStartEnd)
- }
- case _: DataStreamRel =>
- // anything else does not forward keys or might duplicate key, so we can stop
- keys = None
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
deleted file mode 100644
index d4ff3f7..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.types.Row
-import org.junit._
-
-import scala.collection.mutable
-
-class JoinITCase extends StreamingWithStateTestBase {
-
- val data = List(
- (1L, 1, "Hello"),
- (2L, 2, "Hello"),
- (3L, 3, "Hello"),
- (4L, 4, "Hello"),
- (5L, 5, "Hello"),
- (6L, 6, "Hello"),
- (7L, 7, "Hello World"),
- (8L, 8, "Hello World"),
- (20L, 20, "Hello World"))
-
- /** test process time inner join **/
- @Test
- def testProcessTimeInnerJoin(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStateBackend(getStateBackend)
- StreamITCase.testResults = mutable.MutableList()
- env.setParallelism(1)
-
- val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
- "t1.proctime between t2.proctime - interval '5' second and t2.proctime + interval '5' second"
-
- val data1 = new mutable.MutableList[(Int, Long, String)]
- data1.+=((1, 1L, "Hi1"))
- data1.+=((1, 2L, "Hi2"))
- data1.+=((1, 5L, "Hi3"))
- data1.+=((2, 7L, "Hi5"))
- data1.+=((1, 9L, "Hi6"))
- data1.+=((1, 8L, "Hi8"))
-
- val data2 = new mutable.MutableList[(Int, Long, String)]
- data2.+=((1, 1L, "HiHi"))
- data2.+=((2, 2L, "HeHe"))
-
- val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
- val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
- tEnv.registerTable("T2", t2)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
- }
-
- /** test process time inner join with other condition **/
- @Test
- def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStateBackend(getStateBackend)
- StreamITCase.testResults = mutable.MutableList()
- env.setParallelism(1)
-
- val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
- "t1.proctime between t2.proctime - interval '5' second " +
- "and t2.proctime + interval '5' second " +
- "and t1.b > t2.b and t1.b + t2.b < 14"
-
- val data1 = new mutable.MutableList[(String, Long, String)]
- data1.+=(("1", 1L, "Hi1"))
- data1.+=(("1", 2L, "Hi2"))
- data1.+=(("1", 5L, "Hi3"))
- data1.+=(("2", 7L, "Hi5"))
- data1.+=(("1", 9L, "Hi6"))
- data1.+=(("1", 8L, "Hi8"))
-
- val data2 = new mutable.MutableList[(String, Long, String)]
- data2.+=(("1", 5L, "HiHi"))
- data2.+=(("2", 2L, "HeHe"))
-
- val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
- val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
- tEnv.registerTable("T2", t2)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
- }
-
-}
-