You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/01/09 12:37:06 UTC
[2/3] flink git commit: [FLINK-7797] [table] Add support for windowed
outer joins for streaming tables.
[FLINK-7797] [table] Add support for windowed outer joins for streaming tables.
This closes #5140.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/222e6945
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/222e6945
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/222e6945
Branch: refs/heads/master
Commit: 222e6945b795cf45ee5aa5228fb207e980854046
Parents: e6fbfdc
Author: Xingcan Cui <xi...@gmail.com>
Authored: Fri Dec 8 01:28:40 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Jan 9 12:05:05 2018 +0100
----------------------------------------------------------------------
docs/dev/table/sql.md | 2 -
docs/dev/table/tableApi.md | 4 -
.../nodes/datastream/DataStreamWindowJoin.scala | 170 +++---
.../table/runtime/join/EmitAwareCollector.scala | 54 ++
.../runtime/join/OuterJoinPaddingUtil.scala | 71 +++
.../join/ProcTimeBoundedStreamInnerJoin.scala | 67 ---
.../join/ProcTimeBoundedStreamJoin.scala | 70 +++
.../join/RowTimeBoundedStreamInnerJoin.scala | 82 ---
.../runtime/join/RowTimeBoundedStreamJoin.scala | 85 +++
.../join/TimeBoundedStreamInnerJoin.scala | 424 ---------------
.../runtime/join/TimeBoundedStreamJoin.scala | 535 +++++++++++++++++++
.../flink/table/api/stream/sql/JoinTest.scala | 275 ++++++++++
.../flink/table/api/stream/table/JoinTest.scala | 326 +++++++++--
.../table/runtime/harness/JoinHarnessTest.scala | 526 +++++++++++++++---
.../table/runtime/stream/sql/JoinITCase.scala | 507 +++++++++++++++++-
15 files changed, 2404 insertions(+), 794 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 04d6e84..d9a4ce7 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -408,8 +408,6 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
<li><code>ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE</code></li>
<li><code>ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND</code></li>
</ul>
-
- <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
{% highlight sql %}
SELECT *
http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 2b58a62..3b31325 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -535,8 +535,6 @@ Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
<li><code>ltime >= rtime && ltime < rtime + 10.minutes</code></li>
</ul>
- <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
-
{% highlight java %}
Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");
@@ -652,8 +650,6 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
<li><code>'ltime === 'rtime</code></li>
<li><code>'ltime >= 'rtime && 'ltime < 'rtime + 10.minutes</code></li>
</ul>
-
- <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
{% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime)
http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
index 27f2c74..94402672 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -23,17 +23,21 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.functions.{FlatMapFunction, MapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.api.java.operators.join.JoinType
import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
import org.apache.flink.table.plan.nodes.CommonJoin
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.plan.util.UpdatingPlanChecker
-import org.apache.flink.table.runtime.CRowKeySelector
-import org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamInnerJoin, RowTimeBoundedStreamInnerJoin, WindowJoinUtil}
+import org.apache.flink.table.runtime.{CRowKeySelector, CRowWrappingCollector}
+import org.apache.flink.table.runtime.join.{OuterJoinPaddingUtil, ProcTimeBoundedStreamJoin, RowTimeBoundedStreamJoin, WindowJoinUtil}
import org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.util.Logging
@@ -142,71 +146,98 @@ class DataStreamWindowJoin(
s"${joinConditionToString(schema.relDataType, joinCondition, getExpressionString)}), " +
s"join: (${joinSelectionToString(schema.relDataType)})"
- joinType match {
- case JoinRelType.INNER =>
- if (relativeWindowSize < 0) {
- LOG.warn(s"The relative window size $relativeWindowSize is negative," +
- " please check the join conditions.")
- createEmptyInnerJoin(leftDataStream, rightDataStream, returnTypeInfo)
- } else {
- if (isRowTime) {
- createRowTimeInnerJoin(
- leftDataStream,
- rightDataStream,
- returnTypeInfo,
- joinOpName,
- joinFunction.name,
- joinFunction.code,
- leftKeys,
- rightKeys
- )
- } else {
- createProcTimeInnerJoin(
- leftDataStream,
- rightDataStream,
- returnTypeInfo,
- joinOpName,
- joinFunction.name,
- joinFunction.code,
- leftKeys,
- rightKeys
- )
- }
- }
- case JoinRelType.FULL =>
- throw new TableException(
- "Full join between stream and stream is not supported yet.")
- case JoinRelType.LEFT =>
- throw new TableException(
- "Left join between stream and stream is not supported yet.")
- case JoinRelType.RIGHT =>
- throw new TableException(
- "Right join between stream and stream is not supported yet.")
+ val flinkJoinType = joinType match {
+ case JoinRelType.INNER => JoinType.INNER
+ case JoinRelType.FULL => JoinType.FULL_OUTER
+ case JoinRelType.LEFT => JoinType.LEFT_OUTER
+ case JoinRelType.RIGHT => JoinType.RIGHT_OUTER
+ }
+
+ if (relativeWindowSize < 0) {
+ LOG.warn(s"The relative window size $relativeWindowSize is negative," +
+ " please check the join conditions.")
+ createNegativeWindowSizeJoin(
+ flinkJoinType,
+ leftDataStream,
+ rightDataStream,
+ leftSchema.arity,
+ rightSchema.arity,
+ returnTypeInfo)
+ } else {
+ if (isRowTime) {
+ createRowTimeJoin(
+ flinkJoinType,
+ leftDataStream,
+ rightDataStream,
+ returnTypeInfo,
+ joinOpName,
+ joinFunction.name,
+ joinFunction.code,
+ leftKeys,
+ rightKeys
+ )
+ } else {
+ createProcTimeJoin(
+ flinkJoinType,
+ leftDataStream,
+ rightDataStream,
+ returnTypeInfo,
+ joinOpName,
+ joinFunction.name,
+ joinFunction.code,
+ leftKeys,
+ rightKeys
+ )
+ }
}
}
- def createEmptyInnerJoin(
+ def createNegativeWindowSizeJoin(
+ joinType: JoinType,
leftDataStream: DataStream[CRow],
rightDataStream: DataStream[CRow],
+ leftArity: Int,
+ rightArity: Int,
returnTypeInfo: TypeInformation[CRow]): DataStream[CRow] = {
- leftDataStream.connect(rightDataStream).process(
- new CoProcessFunction[CRow, CRow, CRow] {
- override def processElement1(
- value: CRow,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
- //Do nothing.
- }
- override def processElement2(
- value: CRow,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
- //Do nothing.
- }
- }).returns(returnTypeInfo)
+ // We filter all records instead of adding an empty source to preserve the watermarks.
+ val allFilter = new FlatMapFunction[CRow, CRow] with ResultTypeQueryable[CRow] {
+ override def flatMap(value: CRow, out: Collector[CRow]): Unit = { }
+ override def getProducedType: TypeInformation[CRow] = returnTypeInfo
+ }
+
+ val leftPadder = new MapFunction[CRow, CRow] with ResultTypeQueryable[CRow] {
+ val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity)
+ override def map(value: CRow): CRow = new CRow(paddingUtil.padLeft(value.row), true)
+ override def getProducedType: TypeInformation[CRow] = returnTypeInfo
+ }
+
+ val rightPadder = new MapFunction[CRow, CRow] with ResultTypeQueryable[CRow] {
+ val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity)
+ override def map(value: CRow): CRow = new CRow(paddingUtil.padRight(value.row), true)
+ override def getProducedType: TypeInformation[CRow] = returnTypeInfo
+ }
+
+ val leftP = leftDataStream.getParallelism
+ val rightP = rightDataStream.getParallelism
+
+ joinType match {
+ case JoinType.INNER =>
+ leftDataStream.flatMap(allFilter).name("Empty Inner Join").setParallelism(leftP)
+ .union(rightDataStream.flatMap(allFilter).name("Empty Inner Join").setParallelism(rightP))
+ case JoinType.LEFT_OUTER =>
+ leftDataStream.map(leftPadder).name("Left Outer Join").setParallelism(leftP)
+ .union(rightDataStream.flatMap(allFilter).name("Left Outer Join").setParallelism(rightP))
+ case JoinType.RIGHT_OUTER =>
+ leftDataStream.flatMap(allFilter).name("Right Outer Join").setParallelism(leftP)
+ .union(rightDataStream.map(rightPadder).name("Right Outer Join").setParallelism(rightP))
+ case JoinType.FULL_OUTER =>
+ leftDataStream.map(leftPadder).name("Full Outer Join").setParallelism(leftP)
+ .union(rightDataStream.map(rightPadder).name("Full Outer Join").setParallelism(rightP))
+ }
}
- def createProcTimeInnerJoin(
+ def createProcTimeJoin(
+ joinType: JoinType,
leftDataStream: DataStream[CRow],
rightDataStream: DataStream[CRow],
returnTypeInfo: TypeInformation[CRow],
@@ -216,7 +247,8 @@ class DataStreamWindowJoin(
leftKeys: Array[Int],
rightKeys: Array[Int]): DataStream[CRow] = {
- val procInnerJoinFunc = new ProcTimeBoundedStreamInnerJoin(
+ val procJoinFunc = new ProcTimeBoundedStreamJoin(
+ joinType,
leftLowerBound,
leftUpperBound,
leftSchema.typeInfo,
@@ -229,13 +261,13 @@ class DataStreamWindowJoin(
.keyBy(
new CRowKeySelector(leftKeys, leftSchema.projectedTypeInfo(leftKeys)),
new CRowKeySelector(rightKeys, rightSchema.projectedTypeInfo(rightKeys)))
- .process(procInnerJoinFunc)
+ .process(procJoinFunc)
.name(operatorName)
.returns(returnTypeInfo)
} else {
leftDataStream.connect(rightDataStream)
.keyBy(new NullByteKeySelector[CRow](), new NullByteKeySelector[CRow]())
- .process(procInnerJoinFunc)
+ .process(procJoinFunc)
.setParallelism(1)
.setMaxParallelism(1)
.name(operatorName)
@@ -243,7 +275,8 @@ class DataStreamWindowJoin(
}
}
- def createRowTimeInnerJoin(
+ def createRowTimeJoin(
+ joinType: JoinType,
leftDataStream: DataStream[CRow],
rightDataStream: DataStream[CRow],
returnTypeInfo: TypeInformation[CRow],
@@ -253,7 +286,8 @@ class DataStreamWindowJoin(
leftKeys: Array[Int],
rightKeys: Array[Int]): DataStream[CRow] = {
- val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin(
+ val rowTimeJoinFunc = new RowTimeBoundedStreamJoin(
+ joinType,
leftLowerBound,
leftUpperBound,
allowedLateness = 0L,
@@ -274,8 +308,8 @@ class DataStreamWindowJoin(
operatorName,
returnTypeInfo,
new KeyedCoProcessOperatorWithWatermarkDelay[Tuple, CRow, CRow, CRow](
- rowTimeInnerJoinFunc,
- rowTimeInnerJoinFunc.getMaxOutputDelay)
+ rowTimeJoinFunc,
+ rowTimeJoinFunc.getMaxOutputDelay)
)
} else {
leftDataStream.connect(rightDataStream)
@@ -284,8 +318,8 @@ class DataStreamWindowJoin(
operatorName,
returnTypeInfo,
new KeyedCoProcessOperatorWithWatermarkDelay[java.lang.Byte, CRow, CRow, CRow](
- rowTimeInnerJoinFunc,
- rowTimeInnerJoinFunc.getMaxOutputDelay)
+ rowTimeJoinFunc,
+ rowTimeJoinFunc.getMaxOutputDelay)
)
.setParallelism(1)
.setMaxParallelism(1)
http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/EmitAwareCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/EmitAwareCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/EmitAwareCollector.scala
new file mode 100644
index 0000000..f11ffa3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/EmitAwareCollector.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+ * Collector to wrap a Row into a [[CRow]] and to track whether a row has been emitted by the inner
+ * collector.
+ */
+class EmitAwareCollector extends Collector[Row]{
+
+ var emitted = false
+ var innerCollector: Collector[CRow] = _
+ private val cRow: CRow = new CRow()
+
+ /** Sets the change value of the CRow. **/
+ def setCRowChange(change: Boolean): Unit = {
+ cRow.change = change
+ }
+
+ /** Resets the emitted flag. **/
+ def reset(): Unit = {
+ emitted = false
+ }
+
+ override def collect(record: Row): Unit = {
+ emitted = true
+ cRow.row = record
+ innerCollector.collect(cRow)
+ }
+
+ override def close(): Unit = {
+ innerCollector.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/OuterJoinPaddingUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/OuterJoinPaddingUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/OuterJoinPaddingUtil.scala
new file mode 100644
index 0000000..6f850a9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/OuterJoinPaddingUtil.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.types.Row
+
+/**
+ * An utility to generate reusable padding results for outer joins.
+ */
+class OuterJoinPaddingUtil(leftArity: Int, rightArity: Int) extends java.io.Serializable{
+
+ private val resultArity = leftArity + rightArity
+ private val leftNullPaddingResult = new Row(resultArity)
+ private val rightNullPaddingResult = new Row(resultArity)
+
+ // Initialize the two reusable padding results.
+ var i = 0
+ while (i < leftArity) {
+ leftNullPaddingResult.setField(i, null)
+ i = i + 1
+ }
+ i = 0
+ while (i < rightArity) {
+ rightNullPaddingResult.setField(i + leftArity, null)
+ i = i + 1
+ }
+
+ /**
+ * Returns a padding result with the given right row.
+ * @param rightRow the right row to pad
+ * @return the reusable null padding result
+ */
+ final def padRight(rightRow: Row): Row = {
+ var i = 0
+ while (i < rightArity) {
+ leftNullPaddingResult.setField(leftArity + i, rightRow.getField(i))
+ i = i + 1
+ }
+ leftNullPaddingResult
+ }
+
+ /**
+ * Returns a padding result with the given left row.
+ * @param leftRow the left row to pad
+ * @return the reusable null padding result
+ */
+ final def padLeft(leftRow: Row): Row = {
+ var i = 0
+ while (i < leftArity) {
+ rightNullPaddingResult.setField(i, leftRow.getField(i))
+ i = i + 1
+ }
+ rightNullPaddingResult
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
deleted file mode 100644
index 3bac42c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.join
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.types.Row
-
-/**
- * The function to execute processing time bounded stream inner-join.
- */
-final class ProcTimeBoundedStreamInnerJoin(
- leftLowerBound: Long,
- leftUpperBound: Long,
- leftType: TypeInformation[Row],
- rightType: TypeInformation[Row],
- genJoinFuncName: String,
- genJoinFuncCode: String)
- extends TimeBoundedStreamInnerJoin(
- leftLowerBound,
- leftUpperBound,
- allowedLateness = 0L,
- leftType,
- rightType,
- genJoinFuncName,
- genJoinFuncCode) {
-
- override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = {
- leftOperatorTime = ctx.timerService().currentProcessingTime()
- rightOperatorTime = leftOperatorTime
- }
-
- override def getTimeForLeftStream(
- context: CoProcessFunction[CRow, CRow, CRow]#Context,
- row: Row): Long = {
- leftOperatorTime
- }
-
- override def getTimeForRightStream(
- context: CoProcessFunction[CRow, CRow, CRow]#Context,
- row: Row): Long = {
- rightOperatorTime
- }
-
- override def registerTimer(
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- cleanupTime: Long): Unit = {
- ctx.timerService.registerProcessingTimeTimer(cleanupTime)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamJoin.scala
new file mode 100644
index 0000000..94f06d2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamJoin.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+ * The function to execute processing time bounded stream inner-join.
+ */
+final class ProcTimeBoundedStreamJoin(
+ joinType: JoinType,
+ leftLowerBound: Long,
+ leftUpperBound: Long,
+ leftType: TypeInformation[Row],
+ rightType: TypeInformation[Row],
+ genJoinFuncName: String,
+ genJoinFuncCode: String)
+ extends TimeBoundedStreamJoin(
+ joinType,
+ leftLowerBound,
+ leftUpperBound,
+ allowedLateness = 0L,
+ leftType,
+ rightType,
+ genJoinFuncName,
+ genJoinFuncCode) {
+
+ override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = {
+ leftOperatorTime = ctx.timerService().currentProcessingTime()
+ rightOperatorTime = leftOperatorTime
+ }
+
+ override def getTimeForLeftStream(
+ context: CoProcessFunction[CRow, CRow, CRow]#Context,
+ row: Row): Long = {
+ leftOperatorTime
+ }
+
+ override def getTimeForRightStream(
+ context: CoProcessFunction[CRow, CRow, CRow]#Context,
+ row: Row): Long = {
+ rightOperatorTime
+ }
+
+ override def registerTimer(
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ cleanupTime: Long): Unit = {
+ ctx.timerService.registerProcessingTimeTimer(cleanupTime)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
deleted file mode 100644
index a2d9dca..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.join
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.types.Row
-
-/**
- * The function to execute row(event) time bounded stream inner-join.
- */
-final class RowTimeBoundedStreamInnerJoin(
- leftLowerBound: Long,
- leftUpperBound: Long,
- allowedLateness: Long,
- leftType: TypeInformation[Row],
- rightType: TypeInformation[Row],
- genJoinFuncName: String,
- genJoinFuncCode: String,
- leftTimeIdx: Int,
- rightTimeIdx: Int)
- extends TimeBoundedStreamInnerJoin(
- leftLowerBound,
- leftUpperBound,
- allowedLateness,
- leftType,
- rightType,
- genJoinFuncName,
- genJoinFuncCode) {
-
- /**
- * Get the maximum interval between receiving a row and emitting it (as part of a joined result).
- * This is the time interval by which watermarks need to be held back.
- *
- * @return the maximum delay for the outputs
- */
- def getMaxOutputDelay: Long = Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness
-
- override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = {
- leftOperatorTime =
- if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark()
- else 0L
- // We may set different operator times in the future.
- rightOperatorTime = leftOperatorTime
- }
-
- override def getTimeForLeftStream(
- context: CoProcessFunction[CRow, CRow, CRow]#Context,
- row: Row): Long = {
- row.getField(leftTimeIdx).asInstanceOf[Long]
- }
-
- override def getTimeForRightStream(
- context: CoProcessFunction[CRow, CRow, CRow]#Context,
- row: Row): Long = {
- row.getField(rightTimeIdx).asInstanceOf[Long]
- }
-
- override def registerTimer(
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- cleanupTime: Long): Unit = {
- // Maybe we can register timers for different streams in the future.
- ctx.timerService.registerEventTimeTimer(cleanupTime)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamJoin.scala
new file mode 100644
index 0000000..619b8b4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamJoin.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+ * The function to execute row(event) time bounded stream inner-join.
+ */
+final class RowTimeBoundedStreamJoin(
+ joinType: JoinType,
+ leftLowerBound: Long,
+ leftUpperBound: Long,
+ allowedLateness: Long,
+ leftType: TypeInformation[Row],
+ rightType: TypeInformation[Row],
+ genJoinFuncName: String,
+ genJoinFuncCode: String,
+ leftTimeIdx: Int,
+ rightTimeIdx: Int)
+ extends TimeBoundedStreamJoin(
+ joinType,
+ leftLowerBound,
+ leftUpperBound,
+ allowedLateness,
+ leftType,
+ rightType,
+ genJoinFuncName,
+ genJoinFuncCode) {
+
+ /**
+ * Get the maximum interval between receiving a row and emitting it (as part of a joined result).
+ * This is the time interval by which watermarks need to be held back.
+ *
+ * @return the maximum delay for the outputs
+ */
+ def getMaxOutputDelay: Long = Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness
+
+ override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = {
+ leftOperatorTime =
+ if (ctx.timerService().currentWatermark() > 0) ctx.timerService().currentWatermark()
+ else 0L
+ // We may set different operator times in the future.
+ rightOperatorTime = leftOperatorTime
+ }
+
+ override def getTimeForLeftStream(
+ context: CoProcessFunction[CRow, CRow, CRow]#Context,
+ row: Row): Long = {
+ row.getField(leftTimeIdx).asInstanceOf[Long]
+ }
+
+ override def getTimeForRightStream(
+ context: CoProcessFunction[CRow, CRow, CRow]#Context,
+ row: Row): Long = {
+ row.getField(rightTimeIdx).asInstanceOf[Long]
+ }
+
+ override def registerTimer(
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ cleanupTime: Long): Unit = {
+ // Maybe we can register timers for different streams in the future.
+ ctx.timerService.registerEventTimeTimer(cleanupTime)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
deleted file mode 100644
index 9625eac..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.join
-
-import java.util
-import java.util.{List => JList}
-
-import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.flink.api.common.state._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ListTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.codegen.Compiler
-import org.apache.flink.table.runtime.CRowWrappingCollector
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.table.util.Logging
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-
-/**
- * A CoProcessFunction to execute time-bounded stream inner-join.
- * Two kinds of time criteria:
- * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X" where
- * X and Y might be negative or positive and X <= Y.
- *
- * @param leftLowerBound the lower bound for the left stream (X in the criteria)
- * @param leftUpperBound the upper bound for the left stream (Y in the criteria)
- * @param allowedLateness the lateness allowed for the two streams
- * @param leftType the input type of left stream
- * @param rightType the input type of right stream
- * @param genJoinFuncName the name of the generated function
- * @param genJoinFuncCode the code of function to evaluate the non-window join conditions
- *
- */
-abstract class TimeBoundedStreamInnerJoin(
- private val leftLowerBound: Long,
- private val leftUpperBound: Long,
- private val allowedLateness: Long,
- private val leftType: TypeInformation[Row],
- private val rightType: TypeInformation[Row],
- private val genJoinFuncName: String,
- private val genJoinFuncCode: String)
- extends CoProcessFunction[CRow, CRow, CRow]
- with Compiler[FlatJoinFunction[Row, Row, Row]]
- with Logging {
-
- private var cRowWrapper: CRowWrappingCollector = _
-
- // the join function for other conditions
- private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
-
- // cache to store rows from the left stream
- private var leftCache: MapState[Long, JList[Row]] = _
- // cache to store rows from the right stream
- private var rightCache: MapState[Long, JList[Row]] = _
-
- // state to record the timer on the left stream. 0 means no timer set
- private var leftTimerState: ValueState[Long] = _
- // state to record the timer on the right stream. 0 means no timer set
- private var rightTimerState: ValueState[Long] = _
-
- protected val leftRelativeSize: Long = -leftLowerBound
- protected val rightRelativeSize: Long = leftUpperBound
-
- // Points in time until which the respective cache has been cleaned.
- private var leftExpirationTime: Long = 0L
- private var rightExpirationTime: Long = 0L
-
- // Current time on the respective input stream.
- protected var leftOperatorTime: Long = 0L
- protected var rightOperatorTime: Long = 0L
-
- // Minimum interval by which state is cleaned up
- private val minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2
-
- if (allowedLateness < 0) {
- throw new IllegalArgumentException("The allowed lateness must be non-negative.")
- }
-
- override def open(config: Configuration) {
- LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
- s"Code:\n$genJoinFuncCode")
- val clazz = compile(
- getRuntimeContext.getUserCodeClassLoader,
- genJoinFuncName,
- genJoinFuncCode)
- LOG.debug("Instantiating JoinFunction.")
- joinFunction = clazz.newInstance()
-
- cRowWrapper = new CRowWrappingCollector()
- cRowWrapper.setChange(true)
-
- // Initialize the data caches.
- val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType)
- val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
- new MapStateDescriptor[Long, JList[Row]](
- "InnerJoinLeftCache",
- Types.LONG.asInstanceOf[TypeInformation[Long]],
- leftListTypeInfo)
- leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
-
- val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType)
- val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
- new MapStateDescriptor[Long, JList[Row]](
- "InnerJoinRightCache",
- Types.LONG.asInstanceOf[TypeInformation[Long]],
- rightListTypeInfo)
- rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
-
- // Initialize the timer states.
- val leftTimerStateDesc: ValueStateDescriptor[Long] =
- new ValueStateDescriptor[Long]("InnerJoinLeftTimerState", classOf[Long])
- leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
-
- val rightTimerStateDesc: ValueStateDescriptor[Long] =
- new ValueStateDescriptor[Long]("InnerJoinRightTimerState", classOf[Long])
- rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
- }
-
- /**
- * Process rows from the left stream.
- */
- override def processElement1(
- cRowValue: CRow,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
-
- updateOperatorTime(ctx)
- val leftRow = cRowValue.row
- val timeForLeftRow: Long = getTimeForLeftStream(ctx, leftRow)
- val rightQualifiedLowerBound: Long = timeForLeftRow - rightRelativeSize
- val rightQualifiedUpperBound: Long = timeForLeftRow + leftRelativeSize
- cRowWrapper.out = out
-
- // Check if we need to cache the current row.
- if (rightOperatorTime < rightQualifiedUpperBound) {
- // Operator time of right stream has not exceeded the upper window bound of the current
- // row. Put it into the left cache, since later coming records from the right stream are
- // expected to be joined with it.
- var leftRowList = leftCache.get(timeForLeftRow)
- if (null == leftRowList) {
- leftRowList = new util.ArrayList[Row](1)
- }
- leftRowList.add(leftRow)
- leftCache.put(timeForLeftRow, leftRowList)
- if (rightTimerState.value == 0) {
- // Register a timer on the RIGHT stream to remove rows.
- registerCleanUpTimer(ctx, timeForLeftRow, leftRow = true)
- }
- }
- // Check if we need to join the current row against cached rows of the right input.
- // The condition here should be rightMinimumTime < rightQualifiedUpperBound.
- // We use rightExpirationTime as an approximation of the rightMinimumTime here,
- // since rightExpirationTime <= rightMinimumTime is always true.
- if (rightExpirationTime < rightQualifiedUpperBound) {
- // Upper bound of current join window has not passed the cache expiration time yet.
- // There might be qualifying rows in the cache that the current row needs to be joined with.
- rightExpirationTime = calExpirationTime(leftOperatorTime, rightRelativeSize)
- // Join the leftRow with rows from the right cache.
- val rightIterator = rightCache.iterator()
- while (rightIterator.hasNext) {
- val rightEntry = rightIterator.next
- val rightTime = rightEntry.getKey
- if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) {
- val rightRows = rightEntry.getValue
- var i = 0
- while (i < rightRows.size) {
- joinFunction.join(leftRow, rightRows.get(i), cRowWrapper)
- i += 1
- }
- }
-
- if (rightTime <= rightExpirationTime) {
- // eager remove
- rightIterator.remove()
- }// We could do the short-cutting optimization here once we get a state with ordered keys.
- }
- }
- }
-
- /**
- * Process rows from the right stream.
- */
- override def processElement2(
- cRowValue: CRow,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
-
- updateOperatorTime(ctx)
- val rightRow = cRowValue.row
- val timeForRightRow: Long = getTimeForRightStream(ctx, rightRow)
- val leftQualifiedLowerBound: Long = timeForRightRow - leftRelativeSize
- val leftQualifiedUpperBound: Long = timeForRightRow + rightRelativeSize
- cRowWrapper.out = out
-
- // Check if we need to cache the current row.
- if (leftOperatorTime < leftQualifiedUpperBound) {
- // Operator time of left stream has not exceeded the upper window bound of the current
- // row. Put it into the right cache, since later coming records from the left stream are
- // expected to be joined with it.
- var rightRowList = rightCache.get(timeForRightRow)
- if (null == rightRowList) {
- rightRowList = new util.ArrayList[Row](1)
- }
- rightRowList.add(rightRow)
- rightCache.put(timeForRightRow, rightRowList)
- if (leftTimerState.value == 0) {
- // Register a timer on the LEFT stream to remove rows.
- registerCleanUpTimer(ctx, timeForRightRow, leftRow = false)
- }
- }
- // Check if we need to join the current row against cached rows of the left input.
- // The condition here should be leftMinimumTime < leftQualifiedUpperBound.
- // We use leftExpirationTime as an approximation of the leftMinimumTime here,
- // since leftExpirationTime <= leftMinimumTime is always true.
- if (leftExpirationTime < leftQualifiedUpperBound) {
- leftExpirationTime = calExpirationTime(rightOperatorTime, leftRelativeSize)
- // Join the rightRow with rows from the left cache.
- val leftIterator = leftCache.iterator()
- while (leftIterator.hasNext) {
- val leftEntry = leftIterator.next
- val leftTime = leftEntry.getKey
- if (leftTime >= leftQualifiedLowerBound && leftTime <= leftQualifiedUpperBound) {
- val leftRows = leftEntry.getValue
- var i = 0
- while (i < leftRows.size) {
- joinFunction.join(leftRows.get(i), rightRow, cRowWrapper)
- i += 1
- }
- }
- if (leftTime <= leftExpirationTime) {
- // eager remove
- leftIterator.remove()
- } // We could do the short-cutting optimization here once we get a state with ordered keys.
- }
- }
- }
-
- /**
- * Called when a registered timer is fired.
- * Remove rows whose timestamps are earlier than the expiration time,
- * and register a new timer for the remaining rows.
- *
- * @param timestamp the timestamp of the timer
- * @param ctx the context to register timer or get current time
- * @param out the collector for returning result values
- */
- override def onTimer(
- timestamp: Long,
- ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
- out: Collector[CRow]): Unit = {
-
- updateOperatorTime(ctx)
- // In the future, we should separate the left and right watermarks. Otherwise, the
- // registered timer of the faster stream will be delayed, even if the watermarks have
- // already been emitted by the source.
- if (leftTimerState.value == timestamp) {
- rightExpirationTime = calExpirationTime(leftOperatorTime, rightRelativeSize)
- removeExpiredRows(
- rightExpirationTime,
- rightCache,
- leftTimerState,
- ctx,
- removeLeft = false
- )
- }
-
- if (rightTimerState.value == timestamp) {
- leftExpirationTime = calExpirationTime(rightOperatorTime, leftRelativeSize)
- removeExpiredRows(
- leftExpirationTime,
- leftCache,
- rightTimerState,
- ctx,
- removeLeft = true
- )
- }
- }
-
- /**
- * Calculate the expiration time with the given operator time and relative window size.
- *
- * @param operatorTime the operator time
- * @param relativeSize the relative window size
- * @return the expiration time for cached rows
- */
- private def calExpirationTime(operatorTime: Long, relativeSize: Long): Long = {
- if (operatorTime < Long.MaxValue) {
- operatorTime - relativeSize - allowedLateness - 1
- } else {
- // When operatorTime = Long.MaxValue, it means the stream has reached the end.
- Long.MaxValue
- }
- }
-
- /**
- * Register a timer for cleaning up rows in a specified time.
- *
- * @param ctx the context to register timer
- * @param rowTime time for the input row
- * @param leftRow whether this row comes from the left stream
- */
- private def registerCleanUpTimer(
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- rowTime: Long,
- leftRow: Boolean): Unit = {
- if (leftRow) {
- val cleanupTime = rowTime + leftRelativeSize + minCleanUpInterval + allowedLateness + 1
- registerTimer(ctx, cleanupTime)
- rightTimerState.update(cleanupTime)
- } else {
- val cleanupTime = rowTime + rightRelativeSize + minCleanUpInterval + allowedLateness + 1
- registerTimer(ctx, cleanupTime)
- leftTimerState.update(cleanupTime)
- }
- }
-
- /**
- * Remove the expired rows. Register a new timer if the cache still holds valid rows
- * after the cleaning up.
- *
- * @param expirationTime the expiration time for this cache
- * @param rowCache the row cache
- * @param timerState timer state for the opposite stream
- * @param ctx the context to register the cleanup timer
- * @param removeLeft whether to remove the left rows
- */
- private def removeExpiredRows(
- expirationTime: Long,
- rowCache: MapState[Long, JList[Row]],
- timerState: ValueState[Long],
- ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
- removeLeft: Boolean): Unit = {
-
- val keysIterator = rowCache.keys().iterator()
-
- var earliestTimestamp: Long = -1L
- var rowTime: Long = 0L
-
- // We remove all expired keys and do not leave the loop early.
- // Hence, we do a full pass over the state.
- while (keysIterator.hasNext) {
- rowTime = keysIterator.next
- if (rowTime <= expirationTime) {
- keysIterator.remove()
- } else {
- // We find the earliest timestamp that is still valid.
- if (rowTime < earliestTimestamp || earliestTimestamp < 0) {
- earliestTimestamp = rowTime
- }
- }
- }
-
- if (earliestTimestamp > 0) {
- // There are rows left in the cache. Register a timer to expire them later.
- registerCleanUpTimer(
- ctx,
- earliestTimestamp,
- removeLeft)
- } else {
- // No rows left in the cache. Clear the states and the timerState will be 0.
- timerState.clear()
- rowCache.clear()
- }
- }
-
- /**
- * Update the operator time of the two streams.
- * Must be the first call in all processing methods (i.e., processElement(), onTimer()).
- *
- * @param ctx the context to acquire watermarks
- */
- def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit
-
- /**
- * Return the time for the target row from the left stream.
- *
- * Requires that [[updateOperatorTime()]] has been called before.
- *
- * @param context the runtime context
- * @param row the target row
- * @return time for the target row
- */
- def getTimeForLeftStream(context: CoProcessFunction[CRow, CRow, CRow]#Context, row: Row): Long
-
- /**
- * Return the time for the target row from the right stream.
- *
- * Requires that [[updateOperatorTime()]] has been called before.
- *
- * @param context the runtime context
- * @param row the target row
- * @return time for the target row
- */
- def getTimeForRightStream(context: CoProcessFunction[CRow, CRow, CRow]#Context, row: Row): Long
-
- /**
- * Register a proctime or rowtime timer.
- *
- * @param ctx the context to register the timer
- * @param cleanupTime timestamp for the timer
- */
- def registerTimer(
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- cleanupTime: Long): Unit
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
new file mode 100644
index 0000000..b5546db
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+ * A CoProcessFunction to execute time-bounded stream inner-join.
+ * Two kinds of time criteria:
+ * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X" where
+ * X and Y might be negative or positive and X <= Y.
+ *
+ * @param joinType the join type (inner or left/right/full outer)
+ * @param leftLowerBound the lower bound for the left stream (X in the criteria)
+ * @param leftUpperBound the upper bound for the left stream (Y in the criteria)
+ * @param allowedLateness the lateness allowed for the two streams
+ * @param leftType the input type of left stream
+ * @param rightType the input type of right stream
+ * @param genJoinFuncName the name of the generated function
+ * @param genJoinFuncCode the code of function to evaluate the non-window join conditions
+ *
+ */
+abstract class TimeBoundedStreamJoin(
+ private val joinType: JoinType,
+ private val leftLowerBound: Long,
+ private val leftUpperBound: Long,
+ private val allowedLateness: Long,
+ private val leftType: TypeInformation[Row],
+ private val rightType: TypeInformation[Row],
+ private val genJoinFuncName: String,
+ private val genJoinFuncCode: String)
+ extends CoProcessFunction[CRow, CRow, CRow]
+ with Compiler[FlatJoinFunction[Row, Row, Row]]
+ with Logging {
+
+ private val paddingUtil: OuterJoinPaddingUtil =
+ new OuterJoinPaddingUtil(leftType.getArity, rightType.getArity)
+
+ private var joinCollector: EmitAwareCollector = _
+
+ // the join function for other conditions
+ private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+ // cache to store rows from the left stream
+ private var leftCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _
+ // cache to store rows from the right stream
+ private var rightCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _
+
+ // state to record the timer on the left stream. 0 means no timer set
+ private var leftTimerState: ValueState[Long] = _
+ // state to record the timer on the right stream. 0 means no timer set
+ private var rightTimerState: ValueState[Long] = _
+
+ protected val leftRelativeSize: Long = -leftLowerBound
+ protected val rightRelativeSize: Long = leftUpperBound
+
+ // Points in time until which the respective cache has been cleaned.
+ private var leftExpirationTime: Long = 0L
+ private var rightExpirationTime: Long = 0L
+
+ // Current time on the respective input stream.
+ protected var leftOperatorTime: Long = 0L
+ protected var rightOperatorTime: Long = 0L
+
+ // Minimum interval by which state is cleaned up
+ private val minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2
+
+ if (allowedLateness < 0) {
+ throw new IllegalArgumentException("The allowed lateness must be non-negative.")
+ }
+
+ override def open(config: Configuration) {
+ LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+ s"Code:\n$genJoinFuncCode")
+ val clazz = compile(
+ getRuntimeContext.getUserCodeClassLoader,
+ genJoinFuncName,
+ genJoinFuncCode)
+ LOG.debug("Instantiating JoinFunction.")
+ joinFunction = clazz.newInstance()
+
+ joinCollector = new EmitAwareCollector()
+ joinCollector.setCRowChange(true)
+
+ // Initialize the data caches.
+ val leftListTypeInfo: TypeInformation[JList[JTuple2[Row, Boolean]]] =
+ new ListTypeInfo[JTuple2[Row, Boolean]] (
+ new TupleTypeInfo(leftType, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+ .asInstanceOf[TypeInformation[JTuple2[Row, Boolean]]])
+ val leftStateDescriptor: MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]] =
+ new MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]](
+ "WindowJoinLeftCache",
+ Types.LONG.asInstanceOf[TypeInformation[Long]],
+ leftListTypeInfo)
+ leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
+
+ val rightListTypeInfo: TypeInformation[JList[JTuple2[Row, Boolean]]] =
+ new ListTypeInfo[JTuple2[Row, Boolean]] (
+ new TupleTypeInfo(rightType, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+ .asInstanceOf[TypeInformation[JTuple2[Row, Boolean]]])
+ val rightStateDescriptor: MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]] =
+ new MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]](
+ "WindowJoinRightCache",
+ Types.LONG.asInstanceOf[TypeInformation[Long]],
+ rightListTypeInfo)
+ rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
+
+ // Initialize the timer states.
+ val leftTimerStateDesc: ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("WindowJoinLeftTimerState", classOf[Long])
+ leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
+
+ val rightTimerStateDesc: ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("WindowJoinRightTimerState", classOf[Long])
+ rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
+ }
+
+ /**
+ * Process rows from the left stream.
+ */
+ override def processElement1(
+ cRowValue: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ joinCollector.innerCollector = out
+ updateOperatorTime(ctx)
+ val leftRow = cRowValue.row
+ val timeForLeftRow: Long = getTimeForLeftStream(ctx, leftRow)
+ val rightQualifiedLowerBound: Long = timeForLeftRow - rightRelativeSize
+ val rightQualifiedUpperBound: Long = timeForLeftRow + leftRelativeSize
+ var emitted: Boolean = false
+
+ // Check if we need to join the current row against cached rows of the right input.
+ // The condition here should be rightMinimumTime < rightQualifiedUpperBound.
+ // We use rightExpirationTime as an approximation of the rightMinimumTime here,
+ // since rightExpirationTime <= rightMinimumTime is always true.
+ if (rightExpirationTime < rightQualifiedUpperBound) {
+ // Upper bound of current join window has not passed the cache expiration time yet.
+ // There might be qualifying rows in the cache that the current row needs to be joined with.
+ rightExpirationTime = calExpirationTime(leftOperatorTime, rightRelativeSize)
+ // Join the leftRow with rows from the right cache.
+ val rightIterator = rightCache.iterator()
+ while (rightIterator.hasNext) {
+ val rightEntry = rightIterator.next
+ val rightTime = rightEntry.getKey
+ if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) {
+ val rightRows = rightEntry.getValue
+ var i = 0
+ var entryUpdated = false
+ while (i < rightRows.size) {
+ joinCollector.reset()
+ val tuple = rightRows.get(i)
+ joinFunction.join(leftRow, tuple.f0, joinCollector)
+ emitted ||= joinCollector.emitted
+ if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
+ if (!tuple.f1 && joinCollector.emitted) {
+ // Mark the right row as being successfully joined and emitted.
+ tuple.f1 = true
+ entryUpdated = true
+ }
+ }
+ i += 1
+ }
+ if (entryUpdated) {
+ // Write back the edited entry (mark emitted) for the right cache.
+ rightEntry.setValue(rightRows)
+ }
+ }
+
+ if (rightTime <= rightExpirationTime) {
+ if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
+ val rightRows = rightEntry.getValue
+ var i = 0
+ while (i < rightRows.size) {
+ val tuple = rightRows.get(i)
+ if (!tuple.f1) {
+ // Emit a null padding result if the right row has never been successfully joined.
+ joinCollector.collect(paddingUtil.padRight(tuple.f0))
+ }
+ i += 1
+ }
+ }
+ // eager remove
+ rightIterator.remove()
+ } // We could do the short-cutting optimization here once we get a state with ordered keys.
+ }
+ }
+
+ // Check if we need to cache the current row.
+ if (rightOperatorTime < rightQualifiedUpperBound) {
+ // Operator time of right stream has not exceeded the upper window bound of the current
+ // row. Put it into the left cache, since later coming records from the right stream are
+ // expected to be joined with it.
+ var leftRowList = leftCache.get(timeForLeftRow)
+ if (null == leftRowList) {
+ leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
+ }
+ leftRowList.add(JTuple2.of(leftRow, emitted))
+ leftCache.put(timeForLeftRow, leftRowList)
+ if (rightTimerState.value == 0) {
+ // Register a timer on the RIGHT stream to remove rows.
+ registerCleanUpTimer(ctx, timeForLeftRow, leftRow = true)
+ }
+ } else if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) {
+ if (!emitted) {
+ // Emit a null padding result if the left row is not cached and successfully joined.
+ joinCollector.collect(paddingUtil.padLeft(leftRow))
+ }
+ }
+ }
+
+ /**
+ * Process rows from the right stream.
+ */
+ override def processElement2(
+ cRowValue: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ joinCollector.innerCollector = out
+ updateOperatorTime(ctx)
+ val rightRow = cRowValue.row
+ val timeForRightRow: Long = getTimeForRightStream(ctx, rightRow)
+ val leftQualifiedLowerBound: Long = timeForRightRow - leftRelativeSize
+ val leftQualifiedUpperBound: Long = timeForRightRow + rightRelativeSize
+ var emitted: Boolean = false
+
+ // Check if we need to join the current row against cached rows of the left input.
+ // The condition here should be leftMinimumTime < leftQualifiedUpperBound.
+ // We use leftExpirationTime as an approximation of the leftMinimumTime here,
+ // since leftExpirationTime <= leftMinimumTime is always true.
+ if (leftExpirationTime < leftQualifiedUpperBound) {
+ leftExpirationTime = calExpirationTime(rightOperatorTime, leftRelativeSize)
+ // Join the rightRow with rows from the left cache.
+ val leftIterator = leftCache.iterator()
+ while (leftIterator.hasNext) {
+ val leftEntry = leftIterator.next
+ val leftTime = leftEntry.getKey
+ if (leftTime >= leftQualifiedLowerBound && leftTime <= leftQualifiedUpperBound) {
+ val leftRows = leftEntry.getValue
+ var i = 0
+ var entryUpdated = false
+ while (i < leftRows.size) {
+ joinCollector.reset()
+ val tuple = leftRows.get(i)
+ joinFunction.join(tuple.f0, rightRow, joinCollector)
+ emitted ||= joinCollector.emitted
+ if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) {
+ if (!tuple.f1 && joinCollector.emitted) {
+ // Mark the left row as being successfully joined and emitted.
+ tuple.f1 = true
+ entryUpdated = true
+ }
+ }
+ i += 1
+ }
+ if (entryUpdated) {
+ // Write back the edited entry (mark emitted) for the right cache.
+ leftEntry.setValue(leftRows)
+ }
+ }
+ if (leftTime <= leftExpirationTime) {
+ if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) {
+ val leftRows = leftEntry.getValue
+ var i = 0
+ while (i < leftRows.size) {
+ val tuple = leftRows.get(i)
+ if (!tuple.f1) {
+ // Emit a null padding result if the left row has never been successfully joined.
+ joinCollector.collect(paddingUtil.padLeft(tuple.f0))
+ println(s"Emitting a null padding result for left row ${tuple.f0}")
+ }
+ i += 1
+ }
+ }
+ // eager remove
+ leftIterator.remove()
+ } // We could do the short-cutting optimization here once we get a state with ordered keys.
+ }
+ }
+
+ // Check if we need to cache the current row.
+ if (leftOperatorTime < leftQualifiedUpperBound) {
+ // Operator time of left stream has not exceeded the upper window bound of the current
+ // row. Put it into the right cache, since later coming records from the left stream are
+ // expected to be joined with it.
+ var rightRowList = rightCache.get(timeForRightRow)
+ if (null == rightRowList) {
+ rightRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
+ }
+ rightRowList.add(new JTuple2(rightRow, emitted))
+ rightCache.put(timeForRightRow, rightRowList)
+ if (leftTimerState.value == 0) {
+ // Register a timer on the LEFT stream to remove rows.
+ registerCleanUpTimer(ctx, timeForRightRow, leftRow = false)
+ }
+ } else if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
+ if (!emitted) {
+ // Emit a null padding result if the right row is not cached and successfully joined.
+ joinCollector.collect(paddingUtil.padRight(rightRow))
+ }
+ }
+ }
+
+ /**
+ * Called when a registered timer is fired.
+ * Remove rows whose timestamps are earlier than the expiration time,
+ * and register a new timer for the remaining rows.
+ *
+ * @param timestamp the timestamp of the timer
+ * @param ctx the context to register timer or get current time
+ * @param out the collector for returning result values
+ */
+ override def onTimer(
+ timestamp: Long,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
+
+ joinCollector.innerCollector = out
+ updateOperatorTime(ctx)
+ // In the future, we should separate the left and right watermarks. Otherwise, the
+ // registered timer of the faster stream will be delayed, even if the watermarks have
+ // already been emitted by the source.
+ if (leftTimerState.value == timestamp) {
+ rightExpirationTime = calExpirationTime(leftOperatorTime, rightRelativeSize)
+ removeExpiredRows(
+ joinCollector,
+ rightExpirationTime,
+ rightCache,
+ leftTimerState,
+ ctx,
+ removeLeft = false
+ )
+ }
+
+ if (rightTimerState.value == timestamp) {
+ leftExpirationTime = calExpirationTime(rightOperatorTime, leftRelativeSize)
+ removeExpiredRows(
+ joinCollector,
+ leftExpirationTime,
+ leftCache,
+ rightTimerState,
+ ctx,
+ removeLeft = true
+ )
+ }
+ }
+
+ /**
+ * Calculate the expiration time with the given operator time and relative window size.
+ *
+ * @param operatorTime the operator time
+ * @param relativeSize the relative window size
+ * @return the expiration time for cached rows
+ */
+ private def calExpirationTime(operatorTime: Long, relativeSize: Long): Long = {
+ if (operatorTime < Long.MaxValue) {
+ operatorTime - relativeSize - allowedLateness - 1
+ } else {
+ // When operatorTime = Long.MaxValue, it means the stream has reached the end.
+ Long.MaxValue
+ }
+ }
+
+ /**
+ * Register a timer for cleaning up rows in a specified time.
+ *
+ * @param ctx the context to register timer
+ * @param rowTime time for the input row
+ * @param leftRow whether this row comes from the left stream
+ */
+ private def registerCleanUpTimer(
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ rowTime: Long,
+ leftRow: Boolean): Unit = {
+ if (leftRow) {
+ val cleanupTime = rowTime + leftRelativeSize + minCleanUpInterval + allowedLateness + 1
+ registerTimer(ctx, cleanupTime)
+ rightTimerState.update(cleanupTime)
+ } else {
+ val cleanupTime = rowTime + rightRelativeSize + minCleanUpInterval + allowedLateness + 1
+ registerTimer(ctx, cleanupTime)
+ leftTimerState.update(cleanupTime)
+ }
+ }
+
+ /**
+ * Remove the expired rows. Register a new timer if the cache still holds valid rows
+ * after the cleaning up.
+ *
+ * @param collector the collector to emit results
+ * @param expirationTime the expiration time for this cache
+ * @param rowCache the row cache
+ * @param timerState timer state for the opposite stream
+ * @param ctx the context to register the cleanup timer
+ * @param removeLeft whether to remove the left rows
+ */
+ private def removeExpiredRows(
+ collector: Collector[Row],
+ expirationTime: Long,
+ rowCache: MapState[Long, JList[JTuple2[Row, Boolean]]],
+ timerState: ValueState[Long],
+ ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
+ removeLeft: Boolean): Unit = {
+
+ val iterator = rowCache.iterator()
+
+ var earliestTimestamp: Long = -1L
+
+ // We remove all expired keys and do not leave the loop early.
+ // Hence, we do a full pass over the state.
+ while (iterator.hasNext) {
+ val entry = iterator.next
+ val rowTime = entry.getKey
+ if (rowTime <= expirationTime) {
+ if (removeLeft &&
+ (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER)) {
+ val rows = entry.getValue
+ var i = 0
+ while (i < rows.size) {
+ val tuple = rows.get(i)
+ if (!tuple.f1) {
+ // Emit a null padding result if the row has never been successfully joined.
+ collector.collect(paddingUtil.padLeft(tuple.f0))
+ }
+ i += 1
+ }
+ } else if (!removeLeft &&
+ (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER)) {
+ val rows = entry.getValue
+ var i = 0
+ while (i < rows.size) {
+ val tuple = rows.get(i)
+ if (!tuple.f1) {
+ // Emit a null padding result if the row has never been successfully joined.
+ collector.collect(paddingUtil.padRight(tuple.f0))
+ }
+ i += 1
+ }
+ }
+ iterator.remove()
+ } else {
+ // We find the earliest timestamp that is still valid.
+ if (rowTime < earliestTimestamp || earliestTimestamp < 0) {
+ earliestTimestamp = rowTime
+ }
+ }
+ }
+
+ if (earliestTimestamp > 0) {
+ // There are rows left in the cache. Register a timer to expire them later.
+ registerCleanUpTimer(
+ ctx,
+ earliestTimestamp,
+ removeLeft)
+ } else {
+ // No rows left in the cache. Clear the states and the timerState will be 0.
+ timerState.clear()
+ rowCache.clear()
+ }
+ }
+
+ /**
+ * Update the operator time of the two streams.
+ * Must be the first call in all processing methods (i.e., processElement(), onTimer()).
+ *
+ * @param ctx the context to acquire watermarks
+ */
+ def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit
+
+ /**
+ * Return the time for the target row from the left stream.
+ *
+ * Requires that [[updateOperatorTime()]] has been called before.
+ *
+ * @param context the runtime context
+ * @param row the target row
+ * @return time for the target row
+ */
+ def getTimeForLeftStream(context: CoProcessFunction[CRow, CRow, CRow]#Context, row: Row): Long
+
+ /**
+ * Return the time for the target row from the right stream.
+ *
+ * Requires that [[updateOperatorTime()]] has been called before.
+ *
+ * @param context the runtime context
+ * @param row the target row
+ * @return time for the target row
+ */
+ def getTimeForRightStream(context: CoProcessFunction[CRow, CRow, CRow]#Context, row: Row): Long
+
+ /**
+ * Register a proctime or rowtime timer.
+ *
+ * @param ctx the context to register the timer
+ * @param cleanupTime timestamp for the timer
+ */
+ def registerTimer(
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ cleanupTime: Long): Unit
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/222e6945/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index c14b698..cef19c1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -34,6 +34,7 @@ class JoinTest extends TableTestBase {
streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+ // Tests for inner join
@Test
def testProcessingTimeInnerJoinWithOnClause(): Unit = {
@@ -379,6 +380,280 @@ class JoinTest extends TableTestBase {
streamUtil.verifySql(sqlQuery, expected)
}
+ // Tests for left outer join
+ @Test
+ def testProcTimeLeftOuterJoin(): Unit = {
+
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1 LEFT OUTER JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+ |""".stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "proctime")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "proctime")
+ ),
+ term("where",
+ "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+ "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+ term("join", "a, proctime, a0, b, proctime0"),
+ term("joinType", "LeftOuterJoin")
+ ),
+ term("select", "a", "b")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testRowTimeLeftOuterJoin(): Unit = {
+
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1 LEFT OUTER JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | t1.c BETWEEN t2.c - INTERVAL '10' SECOND AND t2.c + INTERVAL '1' HOUR
+ |""".stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "c")
+ ),
+ term("where",
+ "AND(=(a, a0), >=(c, -(c0, 10000)), " +
+ "<=(c, DATETIME_PLUS(c0, 3600000)))"),
+ term("join", "a, c, a0, b, c0"),
+ term("joinType", "LeftOuterJoin")
+ ),
+ term("select", "a", "b")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ // Tests for right outer join
+ @Test
+ def testProcTimeRightOuterJoin(): Unit = {
+
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1 RIGHT OUTER JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+ |""".stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "proctime")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "proctime")
+ ),
+ term("where",
+ "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+ "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+ term("join", "a, proctime, a0, b, proctime0"),
+ term("joinType", "RightOuterJoin")
+ ),
+ term("select", "a", "b")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testRowTimeRightOuterJoin(): Unit = {
+
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1 RIGHT OUTER JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | t1.c BETWEEN t2.c - INTERVAL '10' SECOND AND t2.c + INTERVAL '1' HOUR
+ |""".stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "c")
+ ),
+ term("where",
+ "AND(=(a, a0), >=(c, -(c0, 10000)), " +
+ "<=(c, DATETIME_PLUS(c0, 3600000)))"),
+ term("join", "a, c, a0, b, c0"),
+ term("joinType", "RightOuterJoin")
+ ),
+ term("select", "a", "b")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ // Tests for full outer join
+ @Test
+ def testProcTimeFullOuterJoin(): Unit = {
+
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1 Full OUTER JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR
+ |""".stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "proctime")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "proctime")
+ ),
+ term("where",
+ "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+ "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+ term("join", "a, proctime, a0, b, proctime0"),
+ term("joinType", "FullOuterJoin")
+ ),
+ term("select", "a", "b")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testRowTimeFullOuterJoin(): Unit = {
+
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1 FULL OUTER JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | t1.c BETWEEN t2.c - INTERVAL '10' SECOND AND t2.c + INTERVAL '1' HOUR
+ |""".stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "c")
+ ),
+ term("where",
+ "AND(=(a, a0), >=(c, -(c0, 10000)), " +
+ "<=(c, DATETIME_PLUS(c0, 3600000)))"),
+ term("join", "a, c, a0, b, c0"),
+ term("joinType", "FullOuterJoin")
+ ),
+ term("select", "a", "b")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ // Test for outer join optimization
+ @Test
+ def testOuterJoinOpt(): Unit = {
+
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1 FULL OUTER JOIN MyTable2 t2 ON
+ | t1.a = t2.a AND
+ | t1.c BETWEEN t2.c - INTERVAL '10' SECOND AND t2.c + INTERVAL '1' HOUR
+ | WHERE t1.b LIKE t2.b
+ |""".stripMargin
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "b", "c")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "c")
+ ),
+ term("where",
+ "AND(=(a, a0), >=(c, -(c0, 10000)), " +
+ "<=(c, DATETIME_PLUS(c0, 3600000)), LIKE(b, b0))"),
+ term("join", "a, b, c, a0, b0, c0"),
+ // Since we filter on attributes b and b0 after the join, the full outer join
+ // will be automatically optimized to inner join.
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b0 AS b")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ // Other tests
@Test
def testJoinTimeBoundary(): Unit = {
verifyTimeBoundary(