You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/03/06 04:47:00 UTC
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window
left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387275#comment-16387275 ]
ASF GitHub Bot commented on FLINK-8428:
---------------------------------------
Github user walterddr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5327#discussion_r172408177
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+ * Connect data for left stream and right stream. Base class for stream-stream non-window Join.
+ *
+ * @param leftType the input type of left stream
+ * @param rightType the input type of right stream
+ * @param resultType the output type of join
+ * @param genJoinFuncName the function code of other non-equi condition
+ * @param genJoinFuncCode the function name of other non-equi condition
+ * @param queryConfig the configuration for the query to generate
+ */
+abstract class NonWindowJoin(
+ leftType: TypeInformation[Row],
+ rightType: TypeInformation[Row],
+ resultType: TypeInformation[CRow],
+ genJoinFuncName: String,
+ genJoinFuncCode: String,
+ queryConfig: StreamQueryConfig)
+ extends CoProcessFunction[CRow, CRow, CRow]
+ with Compiler[FlatJoinFunction[Row, Row, Row]]
+ with Logging {
+
+ // check if input types implement proper equals/hashCode
+ validateEqualsHashCode("join", leftType)
+ validateEqualsHashCode("join", rightType)
+
+ // state to hold left stream element
+ protected var leftState: MapState[Row, JTuple2[Int, Long]] = _
+ // state to hold right stream element
+ protected var rightState: MapState[Row, JTuple2[Int, Long]] = _
+ protected var cRowWrapper: CRowWrappingMultiOutputCollector = _
+
+ protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+ protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+ protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+ // state to record last timer of left stream, 0 means no timer
+ protected var leftTimer: ValueState[Long] = _
+ // state to record last timer of right stream, 0 means no timer
+ protected var rightTimer: ValueState[Long] = _
+
+ // other condition function
+ protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+ s"Code:\n$genJoinFuncCode")
+ val clazz = compile(
+ getRuntimeContext.getUserCodeClassLoader,
+ genJoinFuncName,
+ genJoinFuncCode)
+ LOG.debug("Instantiating JoinFunction.")
+ joinFunction = clazz.newInstance()
+
+ // initialize left and right state, the first element of tuple2 indicates how many rows of
+ // this row, while the second element represents the expired time of this row.
+ val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG)
+ val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]](
+ "left", leftType, tupleTypeInfo)
+ val rightStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]](
+ "right", rightType, tupleTypeInfo)
+ leftState = getRuntimeContext.getMapState(leftStateDescriptor)
+ rightState = getRuntimeContext.getMapState(rightStateDescriptor)
+
+ // initialize timer state
+ val valueStateDescriptor1 = new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+ leftTimer = getRuntimeContext.getState(valueStateDescriptor1)
+ val valueStateDescriptor2 = new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
+ rightTimer = getRuntimeContext.getState(valueStateDescriptor2)
+
+ cRowWrapper = new CRowWrappingMultiOutputCollector()
+ }
+
+ /**
+ * Process left stream records
+ *
+ * @param valueC The input value.
+ * @param ctx The ctx to register timer or get current time
+ * @param out The collector for returning result values.
+ *
+ */
+ override def processElement1(
+ valueC: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ processElement(valueC, ctx, out, leftTimer, leftState, rightState, isLeft = true)
+ }
+
+ /**
+ * Process right stream records
+ *
+ * @param valueC The input value.
+ * @param ctx The ctx to register timer or get current time
+ * @param out The collector for returning result values.
+ *
+ */
+ override def processElement2(
+ valueC: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ processElement(valueC, ctx, out, rightTimer, rightState, leftState, isLeft = false)
+ }
+
+ /**
+ * Called when a processing timer trigger.
+ * Expire left/right records which are expired in left and right state.
+ *
+ * @param timestamp The timestamp of the firing timer.
+ * @param ctx The ctx to register timer or get current time
+ * @param out The collector for returning result values.
+ */
+ override def onTimer(
+ timestamp: Long,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
+
+ if (stateCleaningEnabled && leftTimer.value == timestamp) {
+ expireOutTimeRowForLeft(
+ timestamp,
+ leftState,
+ leftTimer,
+ ctx
+ )
+ }
+
+ if (stateCleaningEnabled && rightTimer.value == timestamp) {
+ expireOutTimeRowForRight(
+ timestamp,
+ rightState,
+ rightTimer,
+ ctx
+ )
+ }
+ }
+
+ /**
+ * Removes records which are expired from left state.
+ */
+ def expireOutTimeRowForLeft(curTime: Long,
+ rowMapState: MapState[Row, JTuple2[Int, Long]],
+ timerState: ValueState[Long],
+ ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = {
+
+ expireOutTimeRow(curTime, rowMapState, timerState, ctx)
+ }
+
+ /**
+ * Removes records which are expired from right state.
+ */
+ def expireOutTimeRowForRight(curTime: Long,
+ rowMapState: MapState[Row, JTuple2[Int, Long]],
+ timerState: ValueState[Long],
+ ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = {
+
+ expireOutTimeRow(curTime, rowMapState, timerState, ctx)
+ }
+
+
+ def getNewExpiredTime(
+ curProcessTime: Long,
+ oldExpiredTime: Long): Long = {
+
+ if (stateCleaningEnabled && curProcessTime + minRetentionTime > oldExpiredTime) {
+ curProcessTime + maxRetentionTime
+ } else {
+ oldExpiredTime
+ }
+ }
+
+ /**
+ * Removes records which are expired from the state. Registers a new timer if the state still
+ * holds records after the clean-up.
+ */
+ def expireOutTimeRow(
+ curTime: Long,
+ rowMapState: MapState[Row, JTuple2[Int, Long]],
+ timerState: ValueState[Long],
+ ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = {
+
+ val rowMapIter = rowMapState.iterator()
+ var validTimestamp: Boolean = false
+
+ while (rowMapIter.hasNext) {
+ val mapEntry = rowMapIter.next()
+ val recordExpiredTime = mapEntry.getValue.f1
+ if (recordExpiredTime <= curTime) {
+ rowMapIter.remove()
+ } else {
+ // we found a timestamp that is still valid
+ validTimestamp = true
+ }
+ }
+
+ // If the state has non-expired timestamps, register a new timer.
+ // Otherwise clean the complete state for this input.
+ if (validTimestamp) {
+ val cleanupTime = curTime + maxRetentionTime
+ ctx.timerService.registerProcessingTimeTimer(cleanupTime)
+ timerState.update(cleanupTime)
+ } else {
+ timerState.clear()
+ rowMapState.clear()
+ }
+ }
+
+ /**
+ * Connect left row and null right row, then collect. The result is NULL from the right side,
+ * if there is no match.
+ *
+ * @param leftRow The row from left side.
+ * @param defaultRow The result row used for output, right side fields will all be null.
+ * @param out The collector for returning result values.
+ */
+ def collectWithNullRight(leftRow: Row, defaultRow: Row, out: Collector[Row]): Unit = {
--- End diff --
I was thinking about somthing like
```
def collectWithDefault(mainRow: Row, defaultRow: Row, out: Collector[Row]): Unit = {
//...
}
```
This way `mainRow` will be `leftRow` when LEFT JOIN and `rightRow` when RIGTH JOIN. It might be usable in FULL OUTER JOIN as well.
> Implement stream-stream non-window left outer join
> --------------------------------------------------
>
> Key: FLINK-8428
> URL: https://issues.apache.org/jira/browse/FLINK-8428
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Hequn Cheng
> Assignee: Hequn Cheng
> Priority: Major
>
> Implement stream-stream non-window left outer join for sql/table-api. A simple design doc can be foundĀ [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)