You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by Xpray <gi...@git.apache.org> on 2018/05/12 15:25:36 UTC

[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

GitHub user Xpray opened a pull request:

    https://github.com/apache/flink/pull/5998

    [FLINK-9344] [TableAPI & SQL] Support INTERSECT and INTERSECT ALL for streaming

    [FLINK-9344] [TableAPI & SQL] Support INTERSECT and INTERSECT ALL for streaming
    
    ## What is the purpose of the change
    Support Intersect and Intersect All for Streaming SQL and TableAPI
    
    
    ## Brief change log
    * implemented StreamIntersectCoProcessFunction
    
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    cases of intersect operations in both ```org.apache.flink.table.runtime.stream.sql.SetOperatorsITCase```  and
    ```org.apache.flink.table.runtime.stream.table.SetOperatorsITCase```
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`:  no
      - The serializers:  no
      - The runtime per-record code paths (performance sensitive):  no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper:  no
      - The S3 file system connector:  no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented?  not yet, but will be documented in next issue


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Xpray/flink FLINK-9344

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5998.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5998
    
----
commit 505875c9e05d0613ddc3ae304293d46780411595
Author: Xpray <le...@...>
Date:   2018-05-12T15:15:09Z

    [FLINK-9344][TableAPI & SQL] Support INTERSECT and INTERSECT ALL for streaming

----


---

[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5998#discussion_r190123516
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala ---
    @@ -0,0 +1,236 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.setop
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.table.api.{StreamQueryConfig, Types}
    +import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
    +import org.apache.flink.table.util.Logging
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +class StreamIntersectCoProcessFunction(
    +  resultType: TypeInformation[Row],
    +  queryConfig: StreamQueryConfig,
    +  all: Boolean)
    +  extends CoProcessFunction[CRow, CRow, CRow]
    +  with Logging {
    +
    +  validateEqualsHashCode("intersect", resultType)
    +
    +  // state to hold left stream element
    +  private var leftState: ValueState[JTuple2[Int, Long]] = _
    +  // state to hold right stream element
    +  private var rightState: ValueState[JTuple2[Int, Long]] = _
    +
    +  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
    +  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
    +  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
    +
    +  // state to record last timer of left stream, 0 means no timer
    +  private var leftTimer: ValueState[Long] = _
    +  // state to record last timer of right stream, 0 means no timer
    +  private var rightTimer: ValueState[Long] = _
    +
    +  private var cRowWrapper: CRowWrappingMultiOutputCollector = _
    +
    +  override def open(parameters: Configuration): Unit = {
    +    LOG.debug("Instantiating StreamIntersectCoProcessFunction.")
    +    // initialize left and right state, the first element of tuple2 indicates how many rows of
    +    // this row, while the second element represents the expired time of this row.
    +    val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG)
    +    val leftStateDescriptor = new ValueStateDescriptor[JTuple2[Int, Long]](
    +      "left", tupleTypeInfo)
    +    val rightStateDescriptor = new ValueStateDescriptor[JTuple2[Int, Long]](
    +      "right", tupleTypeInfo)
    +    leftState = getRuntimeContext.getState(leftStateDescriptor)
    +    rightState = getRuntimeContext.getState(rightStateDescriptor)
    +
    +    // initialize timer state
    +    val valueStateDescriptor1 = new ValueStateDescriptor[Long]("leftTimer", classOf[Long])
    +    leftTimer = getRuntimeContext.getState(valueStateDescriptor1)
    +    val valueStateDescriptor2 = new ValueStateDescriptor[Long]("rightTimer", classOf[Long])
    +    rightTimer = getRuntimeContext.getState(valueStateDescriptor2)
    +
    +    cRowWrapper = new CRowWrappingMultiOutputCollector()
    +    //we emit one record per process at most
    +    cRowWrapper.setTimes(1)
    +  }
    +
    +  override def processElement1(
    +    value: CRow,
    +    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +    out: Collector[CRow]): Unit = {
    +
    +    processElement(value, ctx, out, leftState, leftTimer, rightState)
    +  }
    +
    +  override def processElement2(
    +    value: CRow,
    +    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +    out: Collector[CRow]): Unit = {
    +
    +    processElement(value, ctx, out, rightState, rightTimer, leftState)
    +  }
    +
    +  private def processElement(
    +    value: CRow,
    +    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +    out: Collector[CRow],
    +    currentSideState: ValueState[JTuple2[Int, Long]],
    +    currentSideTimer: ValueState[Long],
    +    otherSideState: ValueState[JTuple2[Int, Long]]): Unit = {
    +
    +    val inputRow = value.row
    +    cRowWrapper.setChange(value.change)
    +    cRowWrapper.setCollector(out)
    +
    +    val cntAndExpiredTime = updateState(value, ctx, currentSideState, currentSideTimer)
    +
    +    val otherSideValue = otherSideState.value()
    +    if (otherSideValue != null) {
    +      if (all) {
    +        if (value.change && cntAndExpiredTime.f0 <= otherSideValue.f0) {
    +          cRowWrapper.collect(inputRow)
    +        } else if (!value.change && cntAndExpiredTime.f0 < otherSideValue.f0) {
    +          cRowWrapper.collect(inputRow)
    +        }
    +      } else {
    +        if (value.change && cntAndExpiredTime.f0 == 1) {
    +          cRowWrapper.collect(inputRow)
    +        } else if (!value.change && cntAndExpiredTime.f0 == 0) {
    +          cRowWrapper.collect(inputRow)
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +    * update valueState and TimerState and return the current state
    +    * @param value
    +    * @param ctx
    +    * @param state
    +    * @param timerState
    +    * @return
    +    */
    +  private def updateState(
    +    value: CRow,
    +    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +    state: ValueState[JTuple2[Int, Long]],
    +    timerState: ValueState[Long]): JTuple2[Int, Long] = {
    +
    +    val curProcessTime = ctx.timerService.currentProcessingTime
    +    val oldCntAndExpiredTime = state.value()
    +    val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
    +      JTuple2.of(0, -1L)
    +    } else {
    +      oldCntAndExpiredTime
    +    }
    +
    +    cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1)
    +    if (stateCleaningEnabled && timerState.value() == 0) {
    +      timerState.update(cntAndExpiredTime.f1)
    +      ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1)
    +    }
    +
    +    if (!value.change) {
    +      cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1
    +      if (cntAndExpiredTime.f0 <= 0) {
    +        state.clear()
    +      } else {
    +        state.update(cntAndExpiredTime)
    +      }
    +    } else {
    +      cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1
    +      state.update(cntAndExpiredTime)
    +    }
    +    cntAndExpiredTime
    +
    +  }
    +
    +  def getNewExpiredTime(
    +   curProcessTime: Long,
    +   oldExpiredTime: Long): Long = {
    +    if (stateCleaningEnabled && curProcessTime + minRetentionTime > oldExpiredTime) {
    +      curProcessTime + maxRetentionTime
    +    } else {
    +      oldExpiredTime
    +    }
    +  }
    +
    +  override def onTimer(
    +    timestamp: Long,
    +    ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
    +    out: Collector[CRow]): Unit = {
    +
    +    if (stateCleaningEnabled && leftTimer.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        leftState,
    +        leftTimer,
    +        ctx
    +      )
    +    }
    +
    +    if (stateCleaningEnabled && rightTimer.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        rightState,
    +        rightTimer,
    +        ctx
    +      )
    +    }
    +  }
    +
    +  private def expireOutTimeRow(
    --- End diff --
    
    I don't believe any of your test triggers this code path.


---

[GitHub] flink issue #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT and INTER...

Posted by Xpray <gi...@git.apache.org>.
Github user Xpray commented on the issue:

    https://github.com/apache/flink/pull/5998
  
    Thanks for the review @walterddr @fhueske , I've updated the PR.


---

[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5998#discussion_r190122671
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala ---
    @@ -0,0 +1,236 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.setop
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.table.api.{StreamQueryConfig, Types}
    +import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
    +import org.apache.flink.table.util.Logging
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +class StreamIntersectCoProcessFunction(
    +  resultType: TypeInformation[Row],
    +  queryConfig: StreamQueryConfig,
    +  all: Boolean)
    +  extends CoProcessFunction[CRow, CRow, CRow]
    --- End diff --
    
    I guess I am confused here:
    
    There's `CoGroupedStream` with customized `CoGroupFunction` which is already supported in DataStream API. seems like if we operate on a windowed stream, we can apply the `intersect` as a `CoGroupFunction`. Is this function solely targeting the non-windowed intersect case. If so, can we rename the function (also adds to my point: please add Java Doc).


---

[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5998#discussion_r190119548
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamIntersect.scala ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
    +import org.apache.flink.table.plan.schema.RowSchema
    +import org.apache.flink.table.runtime.CRowKeySelector
    +import org.apache.flink.table.runtime.setop.StreamIntersectCoProcessFunction
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +
    +import scala.collection.JavaConverters._
    +
    +class DataStreamIntersect(
    +  cluster: RelOptCluster,
    +  traitSet: RelTraitSet,
    +  leftNode: RelNode,
    +  rightNode: RelNode,
    +  rowRelDataType: RelDataType,
    +  all: Boolean)
    +  extends BiRel(cluster, traitSet, leftNode, rightNode)
    +    with DataStreamRel {
    +
    +  private lazy val intersectType = if (all) {
    +    "All"
    --- End diff --
    
    `" All"` might be better formatting since you only attached this to the `explainTerm` and `toString` method


---

[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

Posted by Xpray <gi...@git.apache.org>.
Github user Xpray commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5998#discussion_r190231602
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala ---
    @@ -0,0 +1,236 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.setop
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.table.api.{StreamQueryConfig, Types}
    +import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
    +import org.apache.flink.table.util.Logging
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +class StreamIntersectCoProcessFunction(
    +  resultType: TypeInformation[Row],
    +  queryConfig: StreamQueryConfig,
    +  all: Boolean)
    +  extends CoProcessFunction[CRow, CRow, CRow]
    --- End diff --
    
    Thanks for the review @walterddr and @fhueske , This PR intends to support NonWindow intersect just like NonWindow innerJoin.


---

[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5998#discussion_r190124755
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala ---
    @@ -0,0 +1,236 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.setop
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.table.api.{StreamQueryConfig, Types}
    +import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
    +import org.apache.flink.table.util.Logging
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +class StreamIntersectCoProcessFunction(
    +  resultType: TypeInformation[Row],
    +  queryConfig: StreamQueryConfig,
    +  all: Boolean)
    +  extends CoProcessFunction[CRow, CRow, CRow]
    +  with Logging {
    +
    +  validateEqualsHashCode("intersect", resultType)
    +
    +  // state to hold left stream element
    --- End diff --
    
    The description is misleading, you are not actually holding the "row" of stream element if I understand correctly. 


---

[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5998#discussion_r190124815
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala ---
    @@ -0,0 +1,236 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.setop
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.table.api.{StreamQueryConfig, Types}
    +import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
    +import org.apache.flink.table.util.Logging
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +class StreamIntersectCoProcessFunction(
    +  resultType: TypeInformation[Row],
    +  queryConfig: StreamQueryConfig,
    +  all: Boolean)
    +  extends CoProcessFunction[CRow, CRow, CRow]
    +  with Logging {
    +
    +  validateEqualsHashCode("intersect", resultType)
    +
    +  // state to hold left stream element
    +  private var leftState: ValueState[JTuple2[Int, Long]] = _
    +  // state to hold right stream element
    +  private var rightState: ValueState[JTuple2[Int, Long]] = _
    +
    +  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
    +  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
    +  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
    +
    +  // state to record last timer of left stream, 0 means no timer
    +  private var leftTimer: ValueState[Long] = _
    +  // state to record last timer of right stream, 0 means no timer
    +  private var rightTimer: ValueState[Long] = _
    +
    +  private var cRowWrapper: CRowWrappingMultiOutputCollector = _
    +
    +  override def open(parameters: Configuration): Unit = {
    +    LOG.debug("Instantiating StreamIntersectCoProcessFunction.")
    +    // initialize left and right state, the first element of tuple2 indicates how many rows of
    +    // this row, while the second element represents the expired time of this row.
    +    val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG)
    +    val leftStateDescriptor = new ValueStateDescriptor[JTuple2[Int, Long]](
    +      "left", tupleTypeInfo)
    +    val rightStateDescriptor = new ValueStateDescriptor[JTuple2[Int, Long]](
    +      "right", tupleTypeInfo)
    +    leftState = getRuntimeContext.getState(leftStateDescriptor)
    +    rightState = getRuntimeContext.getState(rightStateDescriptor)
    +
    +    // initialize timer state
    +    val valueStateDescriptor1 = new ValueStateDescriptor[Long]("leftTimer", classOf[Long])
    +    leftTimer = getRuntimeContext.getState(valueStateDescriptor1)
    +    val valueStateDescriptor2 = new ValueStateDescriptor[Long]("rightTimer", classOf[Long])
    +    rightTimer = getRuntimeContext.getState(valueStateDescriptor2)
    +
    +    cRowWrapper = new CRowWrappingMultiOutputCollector()
    +    //we emit one record per process at most
    +    cRowWrapper.setTimes(1)
    +  }
    +
    +  override def processElement1(
    +    value: CRow,
    +    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +    out: Collector[CRow]): Unit = {
    +
    +    processElement(value, ctx, out, leftState, leftTimer, rightState)
    +  }
    +
    +  override def processElement2(
    +    value: CRow,
    +    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +    out: Collector[CRow]): Unit = {
    +
    +    processElement(value, ctx, out, rightState, rightTimer, leftState)
    +  }
    +
    +  private def processElement(
    +    value: CRow,
    +    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +    out: Collector[CRow],
    +    currentSideState: ValueState[JTuple2[Int, Long]],
    +    currentSideTimer: ValueState[Long],
    +    otherSideState: ValueState[JTuple2[Int, Long]]): Unit = {
    +
    +    val inputRow = value.row
    +    cRowWrapper.setChange(value.change)
    +    cRowWrapper.setCollector(out)
    +
    +    val cntAndExpiredTime = updateState(value, ctx, currentSideState, currentSideTimer)
    +
    +    val otherSideValue = otherSideState.value()
    +    if (otherSideValue != null) {
    +      if (all) {
    +        if (value.change && cntAndExpiredTime.f0 <= otherSideValue.f0) {
    +          cRowWrapper.collect(inputRow)
    +        } else if (!value.change && cntAndExpiredTime.f0 < otherSideValue.f0) {
    +          cRowWrapper.collect(inputRow)
    +        }
    +      } else {
    +        if (value.change && cntAndExpiredTime.f0 == 1) {
    +          cRowWrapper.collect(inputRow)
    +        } else if (!value.change && cntAndExpiredTime.f0 == 0) {
    +          cRowWrapper.collect(inputRow)
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +    * update valueState and TimerState and return the current state
    +    * @param value
    +    * @param ctx
    +    * @param state
    +    * @param timerState
    +    * @return
    +    */
    +  private def updateState(
    +    value: CRow,
    +    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +    state: ValueState[JTuple2[Int, Long]],
    +    timerState: ValueState[Long]): JTuple2[Int, Long] = {
    +
    +    val curProcessTime = ctx.timerService.currentProcessingTime
    +    val oldCntAndExpiredTime = state.value()
    +    val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
    +      JTuple2.of(0, -1L)
    +    } else {
    +      oldCntAndExpiredTime
    +    }
    +
    +    cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1)
    +    if (stateCleaningEnabled && timerState.value() == 0) {
    +      timerState.update(cntAndExpiredTime.f1)
    +      ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1)
    +    }
    +
    +    if (!value.change) {
    +      cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1
    +      if (cntAndExpiredTime.f0 <= 0) {
    +        state.clear()
    +      } else {
    +        state.update(cntAndExpiredTime)
    +      }
    +    } else {
    +      cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1
    +      state.update(cntAndExpiredTime)
    +    }
    +    cntAndExpiredTime
    +
    +  }
    +
    +  def getNewExpiredTime(
    +   curProcessTime: Long,
    +   oldExpiredTime: Long): Long = {
    +    if (stateCleaningEnabled && curProcessTime + minRetentionTime > oldExpiredTime) {
    +      curProcessTime + maxRetentionTime
    +    } else {
    +      oldExpiredTime
    +    }
    +  }
    +
    +  override def onTimer(
    +    timestamp: Long,
    +    ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
    +    out: Collector[CRow]): Unit = {
    +
    +    if (stateCleaningEnabled && leftTimer.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        leftState,
    +        leftTimer,
    +        ctx
    +      )
    +    }
    +
    +    if (stateCleaningEnabled && rightTimer.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        rightState,
    +        rightTimer,
    +        ctx
    +      )
    +    }
    +  }
    +
    +  private def expireOutTimeRow(
    --- End diff --
    
    Consider overriding the queryConfig for triggering this perhaps


---

[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5998#discussion_r190120842
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala ---
    @@ -0,0 +1,236 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.setop
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.table.api.{StreamQueryConfig, Types}
    +import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
    +import org.apache.flink.table.util.Logging
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +class StreamIntersectCoProcessFunction(
    --- End diff --
    
    Missing JavaDoc


---

[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5998#discussion_r190119234
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamIntersect.scala ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
    +import org.apache.flink.table.plan.schema.RowSchema
    +import org.apache.flink.table.runtime.CRowKeySelector
    +import org.apache.flink.table.runtime.setop.StreamIntersectCoProcessFunction
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +
    +import scala.collection.JavaConverters._
    +
    +class DataStreamIntersect(
    +  cluster: RelOptCluster,
    +  traitSet: RelTraitSet,
    +  leftNode: RelNode,
    +  rightNode: RelNode,
    +  rowRelDataType: RelDataType,
    +  all: Boolean)
    +  extends BiRel(cluster, traitSet, leftNode, rightNode)
    +    with DataStreamRel {
    +
    +  private lazy val intersectType = if (all) {
    +    "All"
    +  } else {
    +    ""
    +  }
    +
    +  override def needsUpdatesAsRetraction: Boolean = true
    +
    +  override def deriveRowType() = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataStreamIntersect(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      getRowType,
    +      all
    +    )
    +  }
    +
    +  override def toString: String = {
    +    s"Intersect$intersectType(intersect$intersectType: ($intersectSelectionToString))"
    --- End diff --
    
    `s"Intersect$intersectType(intersect: ($intersectSelectionToString))"`
    I dont think you need to duplicate the type twice


---

[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5998#discussion_r190171657
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala ---
    @@ -0,0 +1,236 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.setop
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.table.api.{StreamQueryConfig, Types}
    +import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
    +import org.apache.flink.table.util.Logging
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +class StreamIntersectCoProcessFunction(
    +  resultType: TypeInformation[Row],
    +  queryConfig: StreamQueryConfig,
    +  all: Boolean)
    +  extends CoProcessFunction[CRow, CRow, CRow]
    --- End diff --
    
    I think it makes sense to have two implementations of this operator.
    1. For tables with a time attribute. This implementation works without retraction and can automatically cleanup the state. 
    2. For tables without time attributes. This implementation needs to cleanup state based on retention time and produces retractions.
    
    This PR seems to address both cases, which is fine for now. We can improve for 1. later on. Both cases should be implemented as `CoProcessFunction`. We should try to be independent of the DataStream window operators, IMO.


---