You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by hongyuhong <gi...@git.apache.org> on 2017/07/06 03:30:34 UTC

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

GitHub user hongyuhong opened a pull request:

    https://github.com/apache/flink/pull/4266

    [FLINK-6232][Table&Sql] support proctime inner windowed stream join

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/hongyuhong/flink flink-6232-re

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4266.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4266
    
----
commit e2baf558e100eb7bb0d6b9927cb166b892f78d8f
Author: hongyuhong <ho...@huawei.com>
Date:   2017-07-06T03:24:04Z

    [FLINK-6232][Table&Sql] support proctime inner windowed stream join

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r127178517
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala ---
    @@ -0,0 +1,187 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan._
    +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.calcite.rex.RexNode
    +import org.apache.flink.api.java.functions.NullByteKeySelector
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
    +import org.apache.flink.table.plan.nodes.CommonJoin
    +import org.apache.flink.table.plan.schema.RowSchema
    +import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, WindowJoinUtil}
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +import org.apache.flink.table.updateutils.UpdateCheckUtils
    +
    +/**
    +  * Flink RelNode which matches along with JoinOperator and its related operations.
    +  */
    +class DataStreamWindowJoin(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    leftNode: RelNode,
    +    rightNode: RelNode,
    +    joinCondition: RexNode,
    +    joinType: JoinRelType,
    +    leftSchema: RowSchema,
    +    rightSchema: RowSchema,
    +    schema: RowSchema,
    +    isRowTime: Boolean,
    +    leftLowerBound: Long,
    +    leftUpperBound: Long,
    +    remainCondition: Option[RexNode],
    +    ruleDescription: String)
    +  extends BiRel(cluster, traitSet, leftNode, rightNode)
    +    with CommonJoin
    +    with DataStreamRel {
    +
    +  override def deriveRowType() = schema.logicalType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataStreamWindowJoin(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      joinCondition,
    +      joinType,
    +      leftSchema,
    +      rightSchema,
    +      schema,
    +      isRowTime,
    +      leftLowerBound,
    +      leftUpperBound,
    +      remainCondition,
    +      ruleDescription)
    +  }
    +
    +  override def toString: String = {
    +    joinToString(
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    joinExplainTerms(
    +      super.explainTerms(pw),
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: StreamTableEnvironment,
    +      queryConfig: StreamQueryConfig): DataStream[CRow] = {
    +
    +    val config = tableEnv.getConfig
    +
    +    val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
    --- End diff --
    
    the ` UpdateCheckUtils.isAppendOnly` recursively checks if any downstream operator produces updates. As soon as any downstream operator produces updates, the given operator has to be able to handle them. 
    
    Updates can be encodes as retraction or be implicit per key-wise updates if the update producing and receiving operators use the same keys. Retraction updates are encoded as two messages. Non-retraction updates are encoded as single message and require a key to which they relate (`CRow.change == true` -> insert or update per key, `CRow.change == false`  -> delete on key). Right now, only UpsertTableSinks use non-retraction updates, but other operators such as unbounded joins will use it as well.
    
    So even if `AccRetract` is false, the input might produce updates but those updates are differently encoded, i.e., in a single message. The window stream join is not able to handle updates (it ignores the `CRow.change` flag). Therefore, we must ensure that the inputs do not produce updates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r126683783
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream.sql
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit._
    +
    +import scala.collection.mutable
    +
    +class JoinITCase extends StreamingWithStateTestBase {
    +
    +  val data = List(
    +    (1L, 1, "Hello"),
    +    (2L, 2, "Hello"),
    +    (3L, 3, "Hello"),
    +    (4L, 4, "Hello"),
    +    (5L, 5, "Hello"),
    +    (6L, 6, "Hello"),
    +    (7L, 7, "Hello World"),
    +    (8L, 8, "Hello World"),
    +    (20L, 20, "Hello World"))
    +
    +  /** test process time inner join **/
    +  @Test
    +  def testProcessTimeInnerJoin(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setStateBackend(getStateBackend)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +
    +    val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " +
    +      "t1.proctime between t2.proctime - interval '5' second and t2.proctime + interval '5' second"
    +
    +    val data1 = new mutable.MutableList[(Int, Long, String)]
    +    data1.+=((1, 1L, "Hi1"))
    +    data1.+=((1, 2L, "Hi2"))
    +    data1.+=((1, 5L, "Hi3"))
    +    data1.+=((2, 7L, "Hi5"))
    +    data1.+=((1, 9L, "Hi6"))
    +    data1.+=((1, 8L, "Hi8"))
    +
    +    val data2 = new mutable.MutableList[(Int, Long, String)]
    +    data2.+=((1, 1L, "HiHi"))
    +    data2.+=((2, 2L, "HeHe"))
    +
    +    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
    +    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
    +
    +    tEnv.registerTable("T1", t1)
    +    tEnv.registerTable("T2", t2)
    +
    +    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
    +    result.addSink(new StreamITCase.StringSink[Row])
    +    env.execute()
    +  }
    +
    +  /** test process time inner join with other condition **/
    +  @Test
    +  def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setStateBackend(getStateBackend)
    +    StreamITCase.testResults = mutable.MutableList()
    --- End diff --
    
    You can simply do `StreamITCase.clear` instead of this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4266: [FLINK-6232][Table&Sql] support proctime inner windowed s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4266
  
    Thanks for the review @wuchong.
    I'll address your comments in my upcoming PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4266: [FLINK-6232][Table&Sql] support proctime inner windowed s...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4266
  
    Hi @hongyuhong , is this the same PR with #3715 ?  In order to rebase/remove merge commit, please do not create a new PR, otherwise committers may review an out-date PR or lose the review context. 
    
    You can force update your repo branch via `git push <your-repo-name> flink-6232 --force` and close this PR.
    
    Thanks,
    Jark  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r127193974
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala ---
    @@ -0,0 +1,187 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan._
    +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.calcite.rex.RexNode
    +import org.apache.flink.api.java.functions.NullByteKeySelector
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
    +import org.apache.flink.table.plan.nodes.CommonJoin
    +import org.apache.flink.table.plan.schema.RowSchema
    +import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, WindowJoinUtil}
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +import org.apache.flink.table.updateutils.UpdateCheckUtils
    +
    +/**
    +  * Flink RelNode which matches along with JoinOperator and its related operations.
    +  */
    +class DataStreamWindowJoin(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    leftNode: RelNode,
    +    rightNode: RelNode,
    +    joinCondition: RexNode,
    +    joinType: JoinRelType,
    +    leftSchema: RowSchema,
    +    rightSchema: RowSchema,
    +    schema: RowSchema,
    +    isRowTime: Boolean,
    +    leftLowerBound: Long,
    +    leftUpperBound: Long,
    +    remainCondition: Option[RexNode],
    +    ruleDescription: String)
    +  extends BiRel(cluster, traitSet, leftNode, rightNode)
    +    with CommonJoin
    +    with DataStreamRel {
    +
    +  override def deriveRowType() = schema.logicalType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataStreamWindowJoin(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      joinCondition,
    +      joinType,
    +      leftSchema,
    +      rightSchema,
    +      schema,
    +      isRowTime,
    +      leftLowerBound,
    +      leftUpperBound,
    +      remainCondition,
    +      ruleDescription)
    +  }
    +
    +  override def toString: String = {
    +    joinToString(
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    joinExplainTerms(
    +      super.explainTerms(pw),
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: StreamTableEnvironment,
    +      queryConfig: StreamQueryConfig): DataStream[CRow] = {
    +
    +    val config = tableEnv.getConfig
    +
    +    val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
    --- End diff --
    
    Yes, I think you are right. These checks should also check for updates and not retraction mode. 
    
    Maybe it makes sense to integrate the whole append-only/updates check into the decoration rules. Same for the inference of unique keys (the other method in `UpdateCheckUtils`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r127176219
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala ---
    @@ -0,0 +1,326 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.join
    +
    +import java.util
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.functions.FlatJoinFunction
    +import org.apache.flink.api.common.state._
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.table.codegen.Compiler
    +import org.apache.flink.table.runtime.CRowWrappingCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +/**
    +  * A CoProcessFunction to support stream join stream, currently just support inner-join
    +  *
    +  * @param leftLowerBound
    +  *        the left stream lower bound, and -leftLowerBound is the right stream upper bound
    +  * @param leftUpperBound
    +  *        the left stream upper bound, and -leftUpperBound is the right stream lower bound
    +  * @param element1Type  the input type of left stream
    +  * @param element2Type  the input type of right stream
    +  * @param genJoinFuncName    the function code of other non-equi condition
    +  * @param genJoinFuncCode    the function name of other non-equi condition
    +  *
    +  */
    +class ProcTimeWindowInnerJoin(
    +    private val leftLowerBound: Long,
    +    private val leftUpperBound: Long,
    +    private val element1Type: TypeInformation[Row],
    +    private val element2Type: TypeInformation[Row],
    +    private val genJoinFuncName: String,
    +    private val genJoinFuncCode: String)
    +  extends CoProcessFunction[CRow, CRow, CRow]
    +    with Compiler[FlatJoinFunction[Row, Row, Row]]{
    +
    +  private var cRowWrapper: CRowWrappingCollector = _
    +
    +  /** other condition function **/
    +  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
    +
    +  /** tmp list to store expired records **/
    +  private var listToRemove: JList[Long] = _
    +
    +  /** state to hold left stream element **/
    +  private var row1MapState: MapState[Long, JList[Row]] = _
    +  /** state to hold right stream element **/
    +  private var row2MapState: MapState[Long, JList[Row]] = _
    +
    +  /** state to record last timer of left stream, 0 means no timer **/
    +  private var timerState1: ValueState[Long] = _
    +  /** state to record last timer of right stream, 0 means no timer **/
    +  private var timerState2: ValueState[Long] = _
    +
    +  private val leftStreamWinSize: Long = if (leftLowerBound < 0) -leftLowerBound else 0
    +  private val rightStreamWinSize: Long = if (leftUpperBound > 0) leftUpperBound else 0
    +
    +  val LOG = LoggerFactory.getLogger(this.getClass)
    +
    +  override def open(config: Configuration) {
    +    LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
    +      s"Code:\n$genJoinFuncCode")
    +    val clazz = compile(
    +      getRuntimeContext.getUserCodeClassLoader,
    +      genJoinFuncName,
    +      genJoinFuncCode)
    +    LOG.debug("Instantiating JoinFunction.")
    +    joinFunction = clazz.newInstance()
    +
    +    listToRemove = new util.ArrayList[Long]()
    +    cRowWrapper = new CRowWrappingCollector()
    +    cRowWrapper.setChange(true)
    +
    +    // initialize row state
    +    val rowListTypeInfo1: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element1Type)
    +    val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1)
    +    row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
    +
    +    val rowListTypeInfo2: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element2Type)
    +    val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2)
    +    row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
    +
    +    // initialize timer state
    +    val valueStateDescriptor1: ValueStateDescriptor[Long] =
    +      new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
    +    timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
    +
    +    val valueStateDescriptor2: ValueStateDescriptor[Long] =
    +      new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
    +    timerState2 = getRuntimeContext.getState(valueStateDescriptor2)
    +  }
    +
    +  /**
    +    * Process left stream records
    +    *
    +    * @param valueC The input value.
    +    * @param ctx   The ctx to register timer or get current time
    +    * @param out   The collector for returning result values.
    +    *
    +    */
    +  override def processElement1(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +
    +    processElement(
    +      valueC,
    +      ctx,
    +      out,
    +      leftStreamWinSize,
    +      timerState1,
    +      row1MapState,
    +      row2MapState,
    +      -leftUpperBound,     // right stream lower
    +      -leftLowerBound,     // right stream upper
    +      true
    +    )
    +  }
    +
    +  /**
    +    * Process right stream records
    +    *
    +    * @param valueC The input value.
    +    * @param ctx   The ctx to register timer or get current time
    +    * @param out   The collector for returning result values.
    +    *
    +    */
    +  override def processElement2(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +
    +    processElement(
    +      valueC,
    +      ctx,
    +      out,
    +      rightStreamWinSize,
    +      timerState2,
    +      row2MapState,
    +      row1MapState,
    +      leftLowerBound,    // left stream upper
    +      leftUpperBound,    // left stream upper
    +      false
    +    )
    +  }
    +
    +  /**
    +    * Called when a processing timer trigger.
    +    * Expire left/right records which earlier than current time - windowsize.
    +    *
    +    * @param timestamp The timestamp of the firing timer.
    +    * @param ctx       The ctx to register timer or get current time
    +    * @param out       The collector for returning result values.
    +    */
    +  override def onTimer(
    +      timestamp: Long,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
    +      out: Collector[CRow]): Unit = {
    +
    +    if (timerState1.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        leftStreamWinSize,
    +        row1MapState,
    +        timerState1,
    +        ctx
    +      )
    +    }
    +
    +    if (timerState2.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        rightStreamWinSize,
    +        row2MapState,
    +        timerState2,
    +        ctx
    +      )
    +    }
    +  }
    +
    +  /**
    +    * Puts an element from the input stream into state and search the other state to
    +    * output records meet the condition, and registers a timer for the current record
    +    * if there is no timer at present.
    +    */
    +  private def processElement(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow],
    +      winSize: Long,
    +      timerState: ValueState[Long],
    +      rowMapState: MapState[Long, JList[Row]],
    +      oppoRowMapState: MapState[Long, JList[Row]],
    +      oppoLowerBound: Long,
    +      oppoUpperBound: Long,
    +      isLeft: Boolean): Unit = {
    +
    +    cRowWrapper.out = out
    +
    +    val value = valueC.row
    +
    +    val curProcessTime = ctx.timerService.currentProcessingTime
    +    val oppoLowerTime = curProcessTime + oppoLowerBound
    +    val oppoUpperTime = curProcessTime + oppoUpperBound
    +
    +    // only when windowsize != 0, we need to store the element
    +    if (winSize != 0) {
    +      // register a timer to expire the element
    +      if (timerState.value == 0) {
    +        ctx.timerService.registerProcessingTimeTimer(curProcessTime + winSize + 1)
    +        timerState.update(curProcessTime + winSize + 1)
    +      }
    +
    +      var rowList = rowMapState.get(curProcessTime)
    +      if (rowList == null) {
    +        rowList = new util.ArrayList[Row]()
    +      }
    +      rowList.add(value)
    +      rowMapState.put(curProcessTime, rowList)
    +
    +    }
    +
    +    // loop the other stream elements
    +    val oppositeKeyIter = oppoRowMapState.keys().iterator()
    +    while (oppositeKeyIter.hasNext) {
    +      val eleTime = oppositeKeyIter.next()
    +      if (eleTime < oppoLowerTime) {
    +        listToRemove.add(eleTime)
    +      } else if (eleTime <= oppoUpperTime) {
    +        val oppoRowList = oppoRowMapState.get(eleTime)
    +        var i = 0
    +        if (isLeft) {
    +          while (i < oppoRowList.size) {
    +            joinFunction.join(value, oppoRowList.get(i), cRowWrapper)
    +            i += 1
    +          }
    +        } else {
    +          while (i < oppoRowList.size) {
    +            joinFunction.join(oppoRowList.get(i), value, cRowWrapper)
    +            i += 1
    +          }
    +        }
    +      }
    +    }
    +
    +    // expire records out-of-time
    +    var i = listToRemove.size - 1
    +    while (i >= 0) {
    +      oppoRowMapState.remove(listToRemove.get(i))
    +      i -= 1
    +    }
    +    listToRemove.clear()
    +  }
    +
    +  /**
    +    * Removes records which are outside the join window from the state.
    +    * Registers a new timer if the state still holds records after the clean-up.
    +    */
    +  private def expireOutTimeRow(
    +      curTime: Long,
    +      winSize: Long,
    +      rowMapState: MapState[Long, JList[Row]],
    +      timerState: ValueState[Long],
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = {
    +
    +    val expiredTime = curTime - winSize
    +    val keyIter = rowMapState.keys().iterator()
    +    var nextTimer: Long = 0
    +    // Search for expired timestamps.
    +    // If we find a non-expired timestamp, remember the timestamp and leave the loop.
    +    // This way we find all expired timestamps if they are sorted without doing a full pass.
    +    while (keyIter.hasNext && nextTimer == 0) {
    +      val recordTime = keyIter.next
    +      if (recordTime < expiredTime) {
    +        listToRemove.add(recordTime)
    +      } else {
    +        nextTimer = recordTime
    +      }
    +    }
    +
    +    // Remove expired records from state
    +    var i = listToRemove.size - 1
    +    while (i >= 0) {
    +      rowMapState.remove(listToRemove.get(i))
    +      i -= 1
    +    }
    +    listToRemove.clear()
    +
    +    // If the state has non-expired timestamps, register a new timer.
    +    // Otherwise clean the complete state for this input.
    +    if (nextTimer != 0) {
    +      ctx.timerService.registerProcessingTimeTimer(nextTimer + winSize + 1)
    --- End diff --
    
    yes, that makes sense to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r126683700
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream.sql
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit._
    +
    +import scala.collection.mutable
    +
    +class JoinITCase extends StreamingWithStateTestBase {
    +
    +  val data = List(
    +    (1L, 1, "Hello"),
    +    (2L, 2, "Hello"),
    +    (3L, 3, "Hello"),
    +    (4L, 4, "Hello"),
    +    (5L, 5, "Hello"),
    +    (6L, 6, "Hello"),
    +    (7L, 7, "Hello World"),
    +    (8L, 8, "Hello World"),
    +    (20L, 20, "Hello World"))
    +
    +  /** test process time inner join **/
    +  @Test
    +  def testProcessTimeInnerJoin(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setStateBackend(getStateBackend)
    +    StreamITCase.testResults = mutable.MutableList()
    --- End diff --
    
    You can simply do `StreamITCase.clear` instead of this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r127166606
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala ---
    @@ -0,0 +1,187 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan._
    +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.calcite.rex.RexNode
    +import org.apache.flink.api.java.functions.NullByteKeySelector
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
    +import org.apache.flink.table.plan.nodes.CommonJoin
    +import org.apache.flink.table.plan.schema.RowSchema
    +import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, WindowJoinUtil}
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +import org.apache.flink.table.updateutils.UpdateCheckUtils
    +
    +/**
    +  * Flink RelNode which matches along with JoinOperator and its related operations.
    +  */
    +class DataStreamWindowJoin(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    leftNode: RelNode,
    +    rightNode: RelNode,
    +    joinCondition: RexNode,
    +    joinType: JoinRelType,
    +    leftSchema: RowSchema,
    +    rightSchema: RowSchema,
    +    schema: RowSchema,
    +    isRowTime: Boolean,
    +    leftLowerBound: Long,
    +    leftUpperBound: Long,
    +    remainCondition: Option[RexNode],
    +    ruleDescription: String)
    +  extends BiRel(cluster, traitSet, leftNode, rightNode)
    +    with CommonJoin
    +    with DataStreamRel {
    +
    +  override def deriveRowType() = schema.logicalType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataStreamWindowJoin(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      joinCondition,
    +      joinType,
    +      leftSchema,
    +      rightSchema,
    +      schema,
    +      isRowTime,
    +      leftLowerBound,
    +      leftUpperBound,
    +      remainCondition,
    +      ruleDescription)
    +  }
    +
    +  override def toString: String = {
    +    joinToString(
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    joinExplainTerms(
    +      super.explainTerms(pw),
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: StreamTableEnvironment,
    +      queryConfig: StreamQueryConfig): DataStream[CRow] = {
    +
    +    val config = tableEnv.getConfig
    +
    +    val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
    --- End diff --
    
    `isAccRetract` only checks how updates are encoded but not whether there are updates.
    The current approach is correct, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r127192628
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala ---
    @@ -0,0 +1,187 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan._
    +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.calcite.rex.RexNode
    +import org.apache.flink.api.java.functions.NullByteKeySelector
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
    +import org.apache.flink.table.plan.nodes.CommonJoin
    +import org.apache.flink.table.plan.schema.RowSchema
    +import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, WindowJoinUtil}
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +import org.apache.flink.table.updateutils.UpdateCheckUtils
    +
    +/**
    +  * Flink RelNode which matches along with JoinOperator and its related operations.
    +  */
    +class DataStreamWindowJoin(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    leftNode: RelNode,
    +    rightNode: RelNode,
    +    joinCondition: RexNode,
    +    joinType: JoinRelType,
    +    leftSchema: RowSchema,
    +    rightSchema: RowSchema,
    +    schema: RowSchema,
    +    isRowTime: Boolean,
    +    leftLowerBound: Long,
    +    leftUpperBound: Long,
    +    remainCondition: Option[RexNode],
    +    ruleDescription: String)
    +  extends BiRel(cluster, traitSet, leftNode, rightNode)
    +    with CommonJoin
    +    with DataStreamRel {
    +
    +  override def deriveRowType() = schema.logicalType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataStreamWindowJoin(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      joinCondition,
    +      joinType,
    +      leftSchema,
    +      rightSchema,
    +      schema,
    +      isRowTime,
    +      leftLowerBound,
    +      leftUpperBound,
    +      remainCondition,
    +      ruleDescription)
    +  }
    +
    +  override def toString: String = {
    +    joinToString(
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    joinExplainTerms(
    +      super.explainTerms(pw),
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: StreamTableEnvironment,
    +      queryConfig: StreamQueryConfig): DataStream[CRow] = {
    +
    +    val config = tableEnv.getConfig
    +
    +    val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
    --- End diff --
    
    Thank you for the explanation, that makes sense to me.  But I find `DataStreamOverAggregate` and `DataStreamGroupWindowAggregate` use `DataStreamRetractionRules.isAccRetract`, is that a misusage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r126671128
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala ---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.join
    +
    +import java.math.{BigDecimal => JBigDecimal}
    --- End diff --
    
    remove unused import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4266: [FLINK-6232][Table&Sql] support proctime inner windowed s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4266
  
    Thanks for the update @hongyuhong!
    I will take this PR from here. The logic looks very good but I would like to refactor some parts (mainly  the `WindowJoinUtil`). 
    
    I will open a new PR with your work and my commit on top, probably later today. 
    It would be great if you could review and check my PR.
    
    @wuchong your review is of course also highly welcome :-)
    
    Thank you, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r127165003
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala ---
    @@ -0,0 +1,187 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan._
    +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.calcite.rex.RexNode
    +import org.apache.flink.api.java.functions.NullByteKeySelector
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
    +import org.apache.flink.table.plan.nodes.CommonJoin
    +import org.apache.flink.table.plan.schema.RowSchema
    +import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, WindowJoinUtil}
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +import org.apache.flink.table.updateutils.UpdateCheckUtils
    +
    +/**
    +  * Flink RelNode which matches along with JoinOperator and its related operations.
    +  */
    +class DataStreamWindowJoin(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    leftNode: RelNode,
    +    rightNode: RelNode,
    +    joinCondition: RexNode,
    +    joinType: JoinRelType,
    +    leftSchema: RowSchema,
    +    rightSchema: RowSchema,
    +    schema: RowSchema,
    +    isRowTime: Boolean,
    +    leftLowerBound: Long,
    +    leftUpperBound: Long,
    +    remainCondition: Option[RexNode],
    +    ruleDescription: String)
    +  extends BiRel(cluster, traitSet, leftNode, rightNode)
    +    with CommonJoin
    +    with DataStreamRel {
    +
    +  override def deriveRowType() = schema.logicalType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataStreamWindowJoin(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      joinCondition,
    +      joinType,
    +      leftSchema,
    +      rightSchema,
    +      schema,
    +      isRowTime,
    +      leftLowerBound,
    +      leftUpperBound,
    +      remainCondition,
    +      ruleDescription)
    +  }
    +
    +  override def toString: String = {
    +    joinToString(
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    joinExplainTerms(
    +      super.explainTerms(pw),
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: StreamTableEnvironment,
    +      queryConfig: StreamQueryConfig): DataStream[CRow] = {
    +
    +    val config = tableEnv.getConfig
    +
    +    val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
    --- End diff --
    
    We should use `DataStreamRetractionRules.isAccRetract(input)` to check whether the input will produces updates.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r126680044
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala ---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.join
    +
    +import java.math.{BigDecimal => JBigDecimal}
    +import java.util
    +
    +import org.apache.calcite.plan.RelOptUtil
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.JoinRelType
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.SqlKind
    +import org.apache.flink.api.common.functions.FlatJoinFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.{TableConfig, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
    +import org.apache.flink.table.plan.schema.{RowSchema, TimeIndicatorRelDataType}
    +import org.apache.flink.types.Row
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * An util class to help analyze and build join code .
    +  */
    +object WindowJoinUtil {
    +
    +  /**
    +    * Analyze time-condtion to get time boundary for each stream and get the time type
    --- End diff --
    
    minor typo: condtion -> condition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r127173843
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala ---
    @@ -0,0 +1,326 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.join
    +
    +import java.util
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.functions.FlatJoinFunction
    +import org.apache.flink.api.common.state._
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.table.codegen.Compiler
    +import org.apache.flink.table.runtime.CRowWrappingCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +/**
    +  * A CoProcessFunction to support stream join stream, currently just support inner-join
    +  *
    +  * @param leftLowerBound
    +  *        the left stream lower bound, and -leftLowerBound is the right stream upper bound
    +  * @param leftUpperBound
    +  *        the left stream upper bound, and -leftUpperBound is the right stream lower bound
    +  * @param element1Type  the input type of left stream
    +  * @param element2Type  the input type of right stream
    +  * @param genJoinFuncName    the function code of other non-equi condition
    +  * @param genJoinFuncCode    the function name of other non-equi condition
    +  *
    +  */
    +class ProcTimeWindowInnerJoin(
    +    private val leftLowerBound: Long,
    +    private val leftUpperBound: Long,
    +    private val element1Type: TypeInformation[Row],
    +    private val element2Type: TypeInformation[Row],
    +    private val genJoinFuncName: String,
    +    private val genJoinFuncCode: String)
    +  extends CoProcessFunction[CRow, CRow, CRow]
    +    with Compiler[FlatJoinFunction[Row, Row, Row]]{
    +
    +  private var cRowWrapper: CRowWrappingCollector = _
    +
    +  /** other condition function **/
    +  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
    +
    +  /** tmp list to store expired records **/
    +  private var listToRemove: JList[Long] = _
    +
    +  /** state to hold left stream element **/
    +  private var row1MapState: MapState[Long, JList[Row]] = _
    +  /** state to hold right stream element **/
    +  private var row2MapState: MapState[Long, JList[Row]] = _
    +
    +  /** state to record last timer of left stream, 0 means no timer **/
    +  private var timerState1: ValueState[Long] = _
    +  /** state to record last timer of right stream, 0 means no timer **/
    +  private var timerState2: ValueState[Long] = _
    +
    +  private val leftStreamWinSize: Long = if (leftLowerBound < 0) -leftLowerBound else 0
    +  private val rightStreamWinSize: Long = if (leftUpperBound > 0) leftUpperBound else 0
    +
    +  val LOG = LoggerFactory.getLogger(this.getClass)
    +
    +  override def open(config: Configuration) {
    +    LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
    +      s"Code:\n$genJoinFuncCode")
    +    val clazz = compile(
    +      getRuntimeContext.getUserCodeClassLoader,
    +      genJoinFuncName,
    +      genJoinFuncCode)
    +    LOG.debug("Instantiating JoinFunction.")
    +    joinFunction = clazz.newInstance()
    +
    +    listToRemove = new util.ArrayList[Long]()
    +    cRowWrapper = new CRowWrappingCollector()
    +    cRowWrapper.setChange(true)
    +
    +    // initialize row state
    +    val rowListTypeInfo1: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element1Type)
    +    val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1)
    +    row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
    +
    +    val rowListTypeInfo2: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element2Type)
    +    val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2)
    +    row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
    +
    +    // initialize timer state
    +    val valueStateDescriptor1: ValueStateDescriptor[Long] =
    +      new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
    +    timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
    +
    +    val valueStateDescriptor2: ValueStateDescriptor[Long] =
    +      new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
    +    timerState2 = getRuntimeContext.getState(valueStateDescriptor2)
    +  }
    +
    +  /**
    +    * Process left stream records
    +    *
    +    * @param valueC The input value.
    +    * @param ctx   The ctx to register timer or get current time
    +    * @param out   The collector for returning result values.
    +    *
    +    */
    +  override def processElement1(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +
    +    processElement(
    +      valueC,
    +      ctx,
    +      out,
    +      leftStreamWinSize,
    +      timerState1,
    +      row1MapState,
    +      row2MapState,
    +      -leftUpperBound,     // right stream lower
    +      -leftLowerBound,     // right stream upper
    +      true
    +    )
    +  }
    +
    +  /**
    +    * Process right stream records
    +    *
    +    * @param valueC The input value.
    +    * @param ctx   The ctx to register timer or get current time
    +    * @param out   The collector for returning result values.
    +    *
    +    */
    +  override def processElement2(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +
    +    processElement(
    +      valueC,
    +      ctx,
    +      out,
    +      rightStreamWinSize,
    +      timerState2,
    +      row2MapState,
    +      row1MapState,
    +      leftLowerBound,    // left stream upper
    +      leftUpperBound,    // left stream upper
    +      false
    +    )
    +  }
    +
    +  /**
    +    * Called when a processing timer trigger.
    +    * Expire left/right records which earlier than current time - windowsize.
    +    *
    +    * @param timestamp The timestamp of the firing timer.
    +    * @param ctx       The ctx to register timer or get current time
    +    * @param out       The collector for returning result values.
    +    */
    +  override def onTimer(
    +      timestamp: Long,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
    +      out: Collector[CRow]): Unit = {
    +
    +    if (timerState1.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        leftStreamWinSize,
    +        row1MapState,
    +        timerState1,
    +        ctx
    +      )
    +    }
    +
    +    if (timerState2.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        rightStreamWinSize,
    +        row2MapState,
    +        timerState2,
    +        ctx
    +      )
    +    }
    +  }
    +
    +  /**
    +    * Puts an element from the input stream into state and search the other state to
    +    * output records meet the condition, and registers a timer for the current record
    +    * if there is no timer at present.
    +    */
    +  private def processElement(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow],
    +      winSize: Long,
    +      timerState: ValueState[Long],
    +      rowMapState: MapState[Long, JList[Row]],
    +      oppoRowMapState: MapState[Long, JList[Row]],
    +      oppoLowerBound: Long,
    +      oppoUpperBound: Long,
    +      isLeft: Boolean): Unit = {
    +
    +    cRowWrapper.out = out
    +
    +    val value = valueC.row
    +
    +    val curProcessTime = ctx.timerService.currentProcessingTime
    +    val oppoLowerTime = curProcessTime + oppoLowerBound
    +    val oppoUpperTime = curProcessTime + oppoUpperBound
    +
    +    // only when windowsize != 0, we need to store the element
    +    if (winSize != 0) {
    --- End diff --
    
    I think you are right @wuchong. I'll remove that condition.
    OTOH, this is a processing time join which cannot guarantee strict results anyway ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r126684002
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream.sql
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit._
    +
    +import scala.collection.mutable
    +
    +class JoinITCase extends StreamingWithStateTestBase {
    +
    +  val data = List(
    --- End diff --
    
    Looks like the `data` is never used, can we remove it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4266: [FLINK-6232][Table&Sql] support proctime inner windowed s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4266
  
    Hi @hongyuhong and @wuchong, I opened a new PR which extends this PR. 
    Please have a look and give feedback.
    
    @hongyuhong can you close the PRs #3715 and this one? 
    
    Thank you, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by hongyuhong <gi...@git.apache.org>.
Github user hongyuhong closed the pull request at:

    https://github.com/apache/flink/pull/4266


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r126671328
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala ---
    @@ -0,0 +1,187 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan._
    +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.calcite.rex.RexNode
    +import org.apache.flink.api.java.functions.NullByteKeySelector
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
    +import org.apache.flink.table.plan.nodes.CommonJoin
    +import org.apache.flink.table.plan.schema.RowSchema
    +import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, WindowJoinUtil}
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +import org.apache.flink.table.updateutils.UpdateCheckUtils
    +
    +/**
    +  * Flink RelNode which matches along with JoinOperator and its related operations.
    +  */
    +class DataStreamWindowJoin(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    leftNode: RelNode,
    +    rightNode: RelNode,
    +    joinCondition: RexNode,
    +    joinType: JoinRelType,
    +    leftSchema: RowSchema,
    +    rightSchema: RowSchema,
    +    schema: RowSchema,
    +    isRowTime: Boolean,
    +    leftLowerBound: Long,
    +    leftUpperBound: Long,
    +    remainCondition: Option[RexNode],
    +    ruleDescription: String)
    +  extends BiRel(cluster, traitSet, leftNode, rightNode)
    +    with CommonJoin
    +    with DataStreamRel {
    +
    +  override def deriveRowType() = schema.logicalType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataStreamWindowJoin(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      joinCondition,
    +      joinType,
    +      leftSchema,
    +      rightSchema,
    +      schema,
    +      isRowTime,
    +      leftLowerBound,
    +      leftUpperBound,
    +      remainCondition,
    +      ruleDescription)
    +  }
    +
    +  override def toString: String = {
    +    joinToString(
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    joinExplainTerms(
    +      super.explainTerms(pw),
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: StreamTableEnvironment,
    +      queryConfig: StreamQueryConfig): DataStream[CRow] = {
    +
    +    val config = tableEnv.getConfig
    +
    +    val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
    +    val isRightAppendOnly = UpdateCheckUtils.isAppendOnly(right)
    +    if (!isLeftAppendOnly || !isRightAppendOnly) {
    +      throw new TableException(
    +        "Windowed stream join does not support updates.")
    +    }
    +
    +    val leftDataStream = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
    +    val rightDataStream = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
    +
    +    // get the equality keys and other condition
    +    val joinInfo = JoinInfo.of(leftNode, rightNode, joinCondition)
    +    val leftKeys = joinInfo.leftKeys.toIntArray
    +    val rightKeys = joinInfo.rightKeys.toIntArray
    +
    +    // generate join function
    +    val joinFunction =
    +    WindowJoinUtil.generateJoinFunction(
    +      config,
    +      joinType,
    +      leftSchema.physicalTypeInfo,
    +      rightSchema.physicalTypeInfo,
    +      schema,
    +      remainCondition,
    +      ruleDescription)
    +
    +    joinType match {
    +      case JoinRelType.INNER =>
    +        isRowTime match {
    --- End diff --
    
    I think using a `if (isRowTime) else ` here is more simple.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4266: [FLINK-6232][Table&Sql] support proctime inner windowed s...

Posted by hongyuhong <gi...@git.apache.org>.
Github user hongyuhong commented on the issue:

    https://github.com/apache/flink/pull/4266
  
    Hi @wuchong, thanks for the reminding. There still have some modify in the new commit, so i want to reserve the older commit for easy reviewing, after the reviewing finish, i will close the pr https://github.com/apache/flink/pull/3715.
    
    Thanks very much.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r127142150
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala ---
    @@ -0,0 +1,326 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.join
    +
    +import java.util
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.functions.FlatJoinFunction
    +import org.apache.flink.api.common.state._
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.table.codegen.Compiler
    +import org.apache.flink.table.runtime.CRowWrappingCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +/**
    +  * A CoProcessFunction to support stream join stream, currently just support inner-join
    +  *
    +  * @param leftLowerBound
    +  *        the left stream lower bound, and -leftLowerBound is the right stream upper bound
    +  * @param leftUpperBound
    +  *        the left stream upper bound, and -leftUpperBound is the right stream lower bound
    +  * @param element1Type  the input type of left stream
    +  * @param element2Type  the input type of right stream
    +  * @param genJoinFuncName    the function code of other non-equi condition
    +  * @param genJoinFuncCode    the function name of other non-equi condition
    +  *
    +  */
    +class ProcTimeWindowInnerJoin(
    +    private val leftLowerBound: Long,
    +    private val leftUpperBound: Long,
    +    private val element1Type: TypeInformation[Row],
    +    private val element2Type: TypeInformation[Row],
    +    private val genJoinFuncName: String,
    +    private val genJoinFuncCode: String)
    +  extends CoProcessFunction[CRow, CRow, CRow]
    +    with Compiler[FlatJoinFunction[Row, Row, Row]]{
    +
    +  private var cRowWrapper: CRowWrappingCollector = _
    +
    +  /** other condition function **/
    +  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
    +
    +  /** tmp list to store expired records **/
    +  private var listToRemove: JList[Long] = _
    +
    +  /** state to hold left stream element **/
    +  private var row1MapState: MapState[Long, JList[Row]] = _
    +  /** state to hold right stream element **/
    +  private var row2MapState: MapState[Long, JList[Row]] = _
    +
    +  /** state to record last timer of left stream, 0 means no timer **/
    +  private var timerState1: ValueState[Long] = _
    +  /** state to record last timer of right stream, 0 means no timer **/
    +  private var timerState2: ValueState[Long] = _
    +
    +  private val leftStreamWinSize: Long = if (leftLowerBound < 0) -leftLowerBound else 0
    +  private val rightStreamWinSize: Long = if (leftUpperBound > 0) leftUpperBound else 0
    +
    +  val LOG = LoggerFactory.getLogger(this.getClass)
    +
    +  override def open(config: Configuration) {
    +    LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
    +      s"Code:\n$genJoinFuncCode")
    +    val clazz = compile(
    +      getRuntimeContext.getUserCodeClassLoader,
    +      genJoinFuncName,
    +      genJoinFuncCode)
    +    LOG.debug("Instantiating JoinFunction.")
    +    joinFunction = clazz.newInstance()
    +
    +    listToRemove = new util.ArrayList[Long]()
    +    cRowWrapper = new CRowWrappingCollector()
    +    cRowWrapper.setChange(true)
    +
    +    // initialize row state
    +    val rowListTypeInfo1: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element1Type)
    +    val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1)
    +    row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
    +
    +    val rowListTypeInfo2: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element2Type)
    +    val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2)
    +    row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
    +
    +    // initialize timer state
    +    val valueStateDescriptor1: ValueStateDescriptor[Long] =
    +      new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
    +    timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
    +
    +    val valueStateDescriptor2: ValueStateDescriptor[Long] =
    +      new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
    +    timerState2 = getRuntimeContext.getState(valueStateDescriptor2)
    +  }
    +
    +  /**
    +    * Process left stream records
    +    *
    +    * @param valueC The input value.
    +    * @param ctx   The ctx to register timer or get current time
    +    * @param out   The collector for returning result values.
    +    *
    +    */
    +  override def processElement1(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +
    +    processElement(
    +      valueC,
    +      ctx,
    +      out,
    +      leftStreamWinSize,
    +      timerState1,
    +      row1MapState,
    +      row2MapState,
    +      -leftUpperBound,     // right stream lower
    +      -leftLowerBound,     // right stream upper
    +      true
    +    )
    +  }
    +
    +  /**
    +    * Process right stream records
    +    *
    +    * @param valueC The input value.
    +    * @param ctx   The ctx to register timer or get current time
    +    * @param out   The collector for returning result values.
    +    *
    +    */
    +  override def processElement2(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +
    +    processElement(
    +      valueC,
    +      ctx,
    +      out,
    +      rightStreamWinSize,
    +      timerState2,
    +      row2MapState,
    +      row1MapState,
    +      leftLowerBound,    // left stream upper
    +      leftUpperBound,    // left stream upper
    +      false
    +    )
    +  }
    +
    +  /**
    +    * Called when a processing timer trigger.
    +    * Expire left/right records which earlier than current time - windowsize.
    +    *
    +    * @param timestamp The timestamp of the firing timer.
    +    * @param ctx       The ctx to register timer or get current time
    +    * @param out       The collector for returning result values.
    +    */
    +  override def onTimer(
    +      timestamp: Long,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
    +      out: Collector[CRow]): Unit = {
    +
    +    if (timerState1.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        leftStreamWinSize,
    +        row1MapState,
    +        timerState1,
    +        ctx
    +      )
    +    }
    +
    +    if (timerState2.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        rightStreamWinSize,
    +        row2MapState,
    +        timerState2,
    +        ctx
    +      )
    +    }
    +  }
    +
    +  /**
    +    * Puts an element from the input stream into state and search the other state to
    +    * output records meet the condition, and registers a timer for the current record
    +    * if there is no timer at present.
    +    */
    +  private def processElement(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow],
    +      winSize: Long,
    +      timerState: ValueState[Long],
    +      rowMapState: MapState[Long, JList[Row]],
    +      oppoRowMapState: MapState[Long, JList[Row]],
    +      oppoLowerBound: Long,
    +      oppoUpperBound: Long,
    +      isLeft: Boolean): Unit = {
    +
    +    cRowWrapper.out = out
    +
    +    val value = valueC.row
    +
    +    val curProcessTime = ctx.timerService.currentProcessingTime
    +    val oppoLowerTime = curProcessTime + oppoLowerBound
    +    val oppoUpperTime = curProcessTime + oppoUpperBound
    +
    +    // only when windowsize != 0, we need to store the element
    +    if (winSize != 0) {
    +      // register a timer to expire the element
    +      if (timerState.value == 0) {
    +        ctx.timerService.registerProcessingTimeTimer(curProcessTime + winSize + 1)
    +        timerState.update(curProcessTime + winSize + 1)
    +      }
    +
    +      var rowList = rowMapState.get(curProcessTime)
    +      if (rowList == null) {
    +        rowList = new util.ArrayList[Row]()
    +      }
    +      rowList.add(value)
    +      rowMapState.put(curProcessTime, rowList)
    +
    +    }
    +
    +    // loop the other stream elements
    +    val oppositeKeyIter = oppoRowMapState.keys().iterator()
    +    while (oppositeKeyIter.hasNext) {
    +      val eleTime = oppositeKeyIter.next()
    +      if (eleTime < oppoLowerTime) {
    +        listToRemove.add(eleTime)
    +      } else if (eleTime <= oppoUpperTime) {
    +        val oppoRowList = oppoRowMapState.get(eleTime)
    +        var i = 0
    +        if (isLeft) {
    +          while (i < oppoRowList.size) {
    +            joinFunction.join(value, oppoRowList.get(i), cRowWrapper)
    +            i += 1
    +          }
    +        } else {
    +          while (i < oppoRowList.size) {
    +            joinFunction.join(oppoRowList.get(i), value, cRowWrapper)
    +            i += 1
    +          }
    +        }
    +      }
    +    }
    +
    +    // expire records out-of-time
    +    var i = listToRemove.size - 1
    +    while (i >= 0) {
    +      oppoRowMapState.remove(listToRemove.get(i))
    +      i -= 1
    +    }
    +    listToRemove.clear()
    +  }
    +
    +  /**
    +    * Removes records which are outside the join window from the state.
    +    * Registers a new timer if the state still holds records after the clean-up.
    +    */
    +  private def expireOutTimeRow(
    +      curTime: Long,
    +      winSize: Long,
    +      rowMapState: MapState[Long, JList[Row]],
    +      timerState: ValueState[Long],
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = {
    +
    +    val expiredTime = curTime - winSize
    +    val keyIter = rowMapState.keys().iterator()
    +    var nextTimer: Long = 0
    +    // Search for expired timestamps.
    +    // If we find a non-expired timestamp, remember the timestamp and leave the loop.
    +    // This way we find all expired timestamps if they are sorted without doing a full pass.
    +    while (keyIter.hasNext && nextTimer == 0) {
    +      val recordTime = keyIter.next
    +      if (recordTime < expiredTime) {
    +        listToRemove.add(recordTime)
    +      } else {
    +        nextTimer = recordTime
    +      }
    +    }
    +
    +    // Remove expired records from state
    +    var i = listToRemove.size - 1
    +    while (i >= 0) {
    +      rowMapState.remove(listToRemove.get(i))
    +      i -= 1
    +    }
    +    listToRemove.clear()
    +
    +    // If the state has non-expired timestamps, register a new timer.
    +    // Otherwise clean the complete state for this input.
    +    if (nextTimer != 0) {
    +      ctx.timerService.registerProcessingTimeTimer(nextTimer + winSize + 1)
    --- End diff --
    
    The `nextTimer` maybe not the smallest or greatest timestamp among the non-expired timestamps. Is it better to register a `curTime + winSize + 1` timer? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r127129047
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala ---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.join
    +
    +import java.math.{BigDecimal => JBigDecimal}
    +import java.util
    +
    +import org.apache.calcite.plan.RelOptUtil
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.JoinRelType
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.SqlKind
    +import org.apache.flink.api.common.functions.FlatJoinFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.{TableConfig, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
    +import org.apache.flink.table.plan.schema.{RowSchema, TimeIndicatorRelDataType}
    +import org.apache.flink.types.Row
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * An util class to help analyze and build join code .
    +  */
    +object WindowJoinUtil {
    +
    +  /**
    +    * Analyze time-condtion to get time boundary for each stream and get the time type
    +    * and return remain condition.
    +    *
    +    * @param  condition           join condition
    +    * @param  leftLogicalFieldCnt left stream logical field num
    +    * @param  inputSchema         join rowtype schema
    +    * @param  rexBuilder          util to build rexNode
    +    * @param  config              table environment config
    +    * @return isRowTime, left lower boundary, right lower boundary, remain condition
    +    */
    +  private[flink] def analyzeTimeBoundary(
    +      condition: RexNode,
    +      leftLogicalFieldCnt: Int,
    +      inputSchema: RowSchema,
    +      rexBuilder: RexBuilder,
    +      config: TableConfig): (Boolean, Long, Long, Option[RexNode]) = {
    +
    +    // Converts the condition to conjunctive normal form (CNF)
    +    val cnfCondition = RexUtil.toCnf(rexBuilder, condition)
    +
    +    // split the condition into time indicator condition and other condition
    +    val (timeTerms, remainTerms) = cnfCondition match {
    +      case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
    +        c.getOperands.asScala
    +          .map(analyzeCondtionTermType(_, leftLogicalFieldCnt, inputSchema.logicalType))
    +          .reduceLeft((l, r) => {
    +            (l._1 ++ r._1, l._2 ++ r._2)
    +          })
    +      case _ =>
    +        throw new TableException("A time-based stream join requires exactly " +
    +          "two join predicates that bound the time in both directions.")
    +    }
    +
    +    if (timeTerms.size != 2) {
    +      throw new TableException("A time-based stream join requires exactly " +
    +        "two join predicates that bound the time in both directions.")
    +    }
    +
    +    // extract time offset from the time indicator conditon
    +    val streamTimeOffsets =
    +    timeTerms.map(x => extractTimeOffsetFromCondition(x._3, x._2, rexBuilder, config))
    +
    +    val (leftLowerBound, leftUpperBound) =
    +      streamTimeOffsets match {
    +        case Seq((x, true), (y, false)) => (x, y)
    +        case Seq((x, false), (y, true)) => (y, x)
    +        case _ =>
    +          throw new TableException(
    +            "Time-based join conditions must reference the time attribute of both input tables.")
    +      }
    +
    +    // compose the remain condition list into one condition
    +    val remainCondition =
    +    remainTerms match {
    +      case Seq() => None
    +      case _ =>
    +        // Converts logical field references to physical ones.
    +        Some(remainTerms.map(inputSchema.mapRexNode).reduceLeft((l, r) => {
    +          RelOptUtil.andJoinFilters(rexBuilder, l, r)
    +        }))
    +    }
    +
    +    val isRowTime: Boolean = timeTerms(0)._1 match {
    +      case x if FlinkTypeFactory.isProctimeIndicatorType(x) => false
    +      case _ => true
    +    }
    +    (isRowTime, leftLowerBound, leftUpperBound, remainCondition)
    +  }
    +
    +  /**
    +    * Split the join conditions into time condition and non-time condition
    +    *
    +    * @return (Seq(timeTerms), Seq(remainTerms)),
    +    */
    +  private def analyzeCondtionTermType(
    +      conditionTerm: RexNode,
    +      leftFieldCount: Int,
    +      inputType: RelDataType): (Seq[(RelDataType, Boolean, RexNode)], Seq[RexNode]) = {
    +
    +    conditionTerm match {
    +      case c: RexCall if Seq(SqlKind.GREATER_THAN, SqlKind.GREATER_THAN_OR_EQUAL,
    +        SqlKind.LESS_THAN, SqlKind.LESS_THAN_OR_EQUAL).contains(c.getKind) =>
    +        val timeIndicators = extractTimeIndicatorAccesses(c, leftFieldCount, inputType)
    +        timeIndicators match {
    +          case Seq() =>
    +            (Seq(), Seq(c))
    +          case Seq(v1, v2) =>
    +            if (v1._1 != v2._1) {
    +              throw new TableException(
    +                "Both time attributes in a join condition must be of the same type.")
    +            }
    +            if (v1._2 == v2._2) {
    +              throw new TableException("Time-based join conditions " +
    +                "must reference the time attribute of both input tables.")
    +            }
    +            (Seq((v1._1, v1._2, c)), Seq())
    +          case _ =>
    +            throw new TableException(
    +              "Time-based join conditions must reference the time attribute of both input tables.")
    +        }
    +      case other =>
    +        val timeIndicators = extractTimeIndicatorAccesses(other, leftFieldCount, inputType)
    +        timeIndicators match {
    +          case Seq() =>
    +            (Seq(), Seq(other))
    +          case _ =>
    +            throw new TableException("Time indicators can not be used in non time-condition.")
    +        }
    +    }
    +  }
    +
    +  /**
    +    * Extracts all time indicator attributes that are accessed in an expression.
    +    *
    +    * @return seq(timeType, is left input time indicator)
    +    */
    +  def extractTimeIndicatorAccesses(
    +      expression: RexNode,
    +      leftFieldCount: Int,
    +      inputType: RelDataType): Seq[(RelDataType, Boolean)] = {
    +
    +    expression match {
    +      case i: RexInputRef =>
    +        val idx = i.getIndex
    +        inputType.getFieldList.get(idx).getType match {
    +          case t: TimeIndicatorRelDataType if idx < leftFieldCount =>
    +            // left table time indicator
    +            Seq((t, true))
    +          case t: TimeIndicatorRelDataType =>
    +            // right table time indicator
    +            Seq((t, false))
    +          case _ => Seq()
    +        }
    +      case c: RexCall =>
    +        c.operands.asScala
    +          .map(extractTimeIndicatorAccesses(_, leftFieldCount, inputType))
    +          .reduce(_ ++ _)
    +      case _ => Seq()
    +    }
    +  }
    +
    +  /**
    +    * Computes the absolute bound on the left operand of a comparison expression and
    +    * whether the bound is an upper or lower bound.
    +    *
    +    * @return window boundary, is left lower bound
    +    */
    +  def extractTimeOffsetFromCondition(
    +      timeTerm: RexNode,
    +      isLeftExprBelongLeftTable: Boolean,
    +      rexBuilder: RexBuilder,
    +      config: TableConfig): (Long, Boolean) = {
    +
    +    val timeCall: RexCall = timeTerm.asInstanceOf[RexCall]
    +
    +    val isLeftLowerBound: Boolean =
    +      timeTerm.getKind match {
    +        // e.g a.proctime > b.proctime - 5 sec, then it's the lower bound of a and the value is -5
    +        // e.g b.proctime > a.proctime - 5 sec, then it's not the lower bound of a but upper bound
    +        case kind@(SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL) =>
    +          isLeftExprBelongLeftTable
    +        // e.g a.proctime < b.proctime + 5 sec, the the upper bound of a is 5
    +        case kind@(SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
    +          !isLeftExprBelongLeftTable
    +        case _ =>
    +          throw new TableException("Unsupported time-condition.")
    +      }
    +
    +    val (leftLiteral, rightLiteral) =
    +      reduceTimeExpression(
    +        timeCall.operands.get(0),
    +        timeCall.operands.get(1),
    +        rexBuilder,
    +        config)
    +    val tmpTimeOffset: Long =
    +      if (isLeftExprBelongLeftTable) rightLiteral - leftLiteral else leftLiteral - rightLiteral
    +
    +    val boundary =
    +      tmpTimeOffset.signum * (
    +        if (timeTerm.getKind == SqlKind.LESS_THAN || timeTerm.getKind == SqlKind.GREATER_THAN) {
    +          tmpTimeOffset.abs - 1
    +        } else {
    +          tmpTimeOffset.abs
    +        })
    +
    +    (boundary, isLeftLowerBound)
    +  }
    +
    +  /**
    +    * Calculates the time boundary by replacing the time attribute by a zero literal
    +    * and reducing the expression.
    +    * For example:
    +    * b.proctime - interval '1' second - interval '2' second will be translated to
    +    * 0 - 1000 - 2000
    +    */
    +  private def reduceTimeExpression(
    +      leftRexNode: RexNode,
    +      rightRexNode: RexNode,
    +      rexBuilder: RexBuilder,
    +      config: TableConfig): (Long, Long) = {
    +
    +    /**
    +      * replace the rowtime/proctime with zero literal.
    +      */
    +    def replaceTimeFieldWithLiteral(expr: RexNode): RexNode = {
    +      expr match {
    +        case c: RexCall =>
    +          // replace in call operands
    +          val newOps = c.operands.asScala.map(replaceTimeFieldWithLiteral(_)).asJava
    +          rexBuilder.makeCall(c.getType, c.getOperator, newOps)
    +        case i: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(i.getType) =>
    +          // replace with timestamp
    +          rexBuilder.makeZeroLiteral(expr.getType)
    +        case _: RexInputRef =>
    +          throw new TableException("Time join condition may only reference time indicator fields.")
    +        case _ => expr
    +      }
    +    }
    +
    +    val literalLeftRex = replaceTimeFieldWithLiteral(leftRexNode)
    +    val literalRightRex = replaceTimeFieldWithLiteral(rightRexNode)
    +
    +    val exprReducer = new ExpressionReducer(config)
    +    val originList = new util.ArrayList[RexNode]()
    +    originList.add(literalLeftRex)
    +    originList.add(literalRightRex)
    +    val reduceList = new util.ArrayList[RexNode]()
    +    exprReducer.reduce(rexBuilder, originList, reduceList)
    +
    +    val literals = reduceList.asScala.map(f => f match {
    --- End diff --
    
    Can be simplified to  
    
    ```scala
    val literals = reduceList.asScala.map {
          case literal: RexLiteral =>
            literal.getValue2.asInstanceOf[Long]
          case _ =>
            throw TableException(
              "Time condition may only consist of time attributes, literals, and arithmetic operators.")
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r127173868
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala ---
    @@ -0,0 +1,187 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan._
    +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.calcite.rex.RexNode
    +import org.apache.flink.api.java.functions.NullByteKeySelector
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
    +import org.apache.flink.table.plan.nodes.CommonJoin
    +import org.apache.flink.table.plan.schema.RowSchema
    +import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, WindowJoinUtil}
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +import org.apache.flink.table.updateutils.UpdateCheckUtils
    +
    +/**
    +  * Flink RelNode which matches along with JoinOperator and its related operations.
    +  */
    +class DataStreamWindowJoin(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    leftNode: RelNode,
    +    rightNode: RelNode,
    +    joinCondition: RexNode,
    +    joinType: JoinRelType,
    +    leftSchema: RowSchema,
    +    rightSchema: RowSchema,
    +    schema: RowSchema,
    +    isRowTime: Boolean,
    +    leftLowerBound: Long,
    +    leftUpperBound: Long,
    +    remainCondition: Option[RexNode],
    +    ruleDescription: String)
    +  extends BiRel(cluster, traitSet, leftNode, rightNode)
    +    with CommonJoin
    +    with DataStreamRel {
    +
    +  override def deriveRowType() = schema.logicalType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataStreamWindowJoin(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      joinCondition,
    +      joinType,
    +      leftSchema,
    +      rightSchema,
    +      schema,
    +      isRowTime,
    +      leftLowerBound,
    +      leftUpperBound,
    +      remainCondition,
    +      ruleDescription)
    +  }
    +
    +  override def toString: String = {
    +    joinToString(
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    joinExplainTerms(
    +      super.explainTerms(pw),
    +      schema.logicalType,
    +      joinCondition,
    +      joinType,
    +      getExpressionString)
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: StreamTableEnvironment,
    +      queryConfig: StreamQueryConfig): DataStream[CRow] = {
    +
    +    val config = tableEnv.getConfig
    +
    +    val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
    --- End diff --
    
    The following SQL `select a, sum(b), a+1 from t1 group by a` will optimized into the following nodes:
    
    ```
    DataStreamCalc (AccRetract,  producesUpdates=false)
        DataStreamGroupAggregate (AccRetract, producesUpdates=true)
           DataStreamScan (Acc, producesUpdates=fale)
    ```
    The DataStreamCalc is append only, but is in AccRetract mode which means the output contains retraction. 
    
    I think we want to check whether the input contains retraction, right? 
    
    
        


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r126680468
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala ---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.join
    +
    +import java.math.{BigDecimal => JBigDecimal}
    +import java.util
    +
    +import org.apache.calcite.plan.RelOptUtil
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.JoinRelType
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.SqlKind
    +import org.apache.flink.api.common.functions.FlatJoinFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.{TableConfig, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
    +import org.apache.flink.table.plan.schema.{RowSchema, TimeIndicatorRelDataType}
    +import org.apache.flink.types.Row
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * An util class to help analyze and build join code .
    +  */
    +object WindowJoinUtil {
    +
    +  /**
    +    * Analyze time-condtion to get time boundary for each stream and get the time type
    +    * and return remain condition.
    +    *
    +    * @param  condition           join condition
    +    * @param  leftLogicalFieldCnt left stream logical field num
    +    * @param  inputSchema         join rowtype schema
    +    * @param  rexBuilder          util to build rexNode
    +    * @param  config              table environment config
    +    * @return isRowTime, left lower boundary, right lower boundary, remain condition
    +    */
    +  private[flink] def analyzeTimeBoundary(
    +      condition: RexNode,
    +      leftLogicalFieldCnt: Int,
    +      inputSchema: RowSchema,
    +      rexBuilder: RexBuilder,
    +      config: TableConfig): (Boolean, Long, Long, Option[RexNode]) = {
    +
    +    // Converts the condition to conjunctive normal form (CNF)
    +    val cnfCondition = RexUtil.toCnf(rexBuilder, condition)
    +
    +    // split the condition into time indicator condition and other condition
    +    val (timeTerms, remainTerms) = cnfCondition match {
    +      case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
    +        c.getOperands.asScala
    +          .map(analyzeCondtionTermType(_, leftLogicalFieldCnt, inputSchema.logicalType))
    +          .reduceLeft((l, r) => {
    +            (l._1 ++ r._1, l._2 ++ r._2)
    +          })
    +      case _ =>
    +        throw new TableException("A time-based stream join requires exactly " +
    +          "two join predicates that bound the time in both directions.")
    +    }
    +
    +    if (timeTerms.size != 2) {
    +      throw new TableException("A time-based stream join requires exactly " +
    +        "two join predicates that bound the time in both directions.")
    +    }
    +
    +    // extract time offset from the time indicator conditon
    --- End diff --
    
    minor typo: conditon -> condition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4266#discussion_r127153991
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala ---
    @@ -0,0 +1,326 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.join
    +
    +import java.util
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.functions.FlatJoinFunction
    +import org.apache.flink.api.common.state._
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.table.codegen.Compiler
    +import org.apache.flink.table.runtime.CRowWrappingCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +/**
    +  * A CoProcessFunction to support stream join stream, currently just support inner-join
    +  *
    +  * @param leftLowerBound
    +  *        the left stream lower bound, and -leftLowerBound is the right stream upper bound
    +  * @param leftUpperBound
    +  *        the left stream upper bound, and -leftUpperBound is the right stream lower bound
    +  * @param element1Type  the input type of left stream
    +  * @param element2Type  the input type of right stream
    +  * @param genJoinFuncName    the function code of other non-equi condition
    +  * @param genJoinFuncCode    the function name of other non-equi condition
    +  *
    +  */
    +class ProcTimeWindowInnerJoin(
    +    private val leftLowerBound: Long,
    +    private val leftUpperBound: Long,
    +    private val element1Type: TypeInformation[Row],
    +    private val element2Type: TypeInformation[Row],
    +    private val genJoinFuncName: String,
    +    private val genJoinFuncCode: String)
    +  extends CoProcessFunction[CRow, CRow, CRow]
    +    with Compiler[FlatJoinFunction[Row, Row, Row]]{
    +
    +  private var cRowWrapper: CRowWrappingCollector = _
    +
    +  /** other condition function **/
    +  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
    +
    +  /** tmp list to store expired records **/
    +  private var listToRemove: JList[Long] = _
    +
    +  /** state to hold left stream element **/
    +  private var row1MapState: MapState[Long, JList[Row]] = _
    +  /** state to hold right stream element **/
    +  private var row2MapState: MapState[Long, JList[Row]] = _
    +
    +  /** state to record last timer of left stream, 0 means no timer **/
    +  private var timerState1: ValueState[Long] = _
    +  /** state to record last timer of right stream, 0 means no timer **/
    +  private var timerState2: ValueState[Long] = _
    +
    +  private val leftStreamWinSize: Long = if (leftLowerBound < 0) -leftLowerBound else 0
    +  private val rightStreamWinSize: Long = if (leftUpperBound > 0) leftUpperBound else 0
    +
    +  val LOG = LoggerFactory.getLogger(this.getClass)
    +
    +  override def open(config: Configuration) {
    +    LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
    +      s"Code:\n$genJoinFuncCode")
    +    val clazz = compile(
    +      getRuntimeContext.getUserCodeClassLoader,
    +      genJoinFuncName,
    +      genJoinFuncCode)
    +    LOG.debug("Instantiating JoinFunction.")
    +    joinFunction = clazz.newInstance()
    +
    +    listToRemove = new util.ArrayList[Long]()
    +    cRowWrapper = new CRowWrappingCollector()
    +    cRowWrapper.setChange(true)
    +
    +    // initialize row state
    +    val rowListTypeInfo1: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element1Type)
    +    val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1)
    +    row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
    +
    +    val rowListTypeInfo2: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element2Type)
    +    val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2)
    +    row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
    +
    +    // initialize timer state
    +    val valueStateDescriptor1: ValueStateDescriptor[Long] =
    +      new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
    +    timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
    +
    +    val valueStateDescriptor2: ValueStateDescriptor[Long] =
    +      new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
    +    timerState2 = getRuntimeContext.getState(valueStateDescriptor2)
    +  }
    +
    +  /**
    +    * Process left stream records
    +    *
    +    * @param valueC The input value.
    +    * @param ctx   The ctx to register timer or get current time
    +    * @param out   The collector for returning result values.
    +    *
    +    */
    +  override def processElement1(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +
    +    processElement(
    +      valueC,
    +      ctx,
    +      out,
    +      leftStreamWinSize,
    +      timerState1,
    +      row1MapState,
    +      row2MapState,
    +      -leftUpperBound,     // right stream lower
    +      -leftLowerBound,     // right stream upper
    +      true
    +    )
    +  }
    +
    +  /**
    +    * Process right stream records
    +    *
    +    * @param valueC The input value.
    +    * @param ctx   The ctx to register timer or get current time
    +    * @param out   The collector for returning result values.
    +    *
    +    */
    +  override def processElement2(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +
    +    processElement(
    +      valueC,
    +      ctx,
    +      out,
    +      rightStreamWinSize,
    +      timerState2,
    +      row2MapState,
    +      row1MapState,
    +      leftLowerBound,    // left stream upper
    +      leftUpperBound,    // left stream upper
    +      false
    +    )
    +  }
    +
    +  /**
    +    * Called when a processing timer trigger.
    +    * Expire left/right records which earlier than current time - windowsize.
    +    *
    +    * @param timestamp The timestamp of the firing timer.
    +    * @param ctx       The ctx to register timer or get current time
    +    * @param out       The collector for returning result values.
    +    */
    +  override def onTimer(
    +      timestamp: Long,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
    +      out: Collector[CRow]): Unit = {
    +
    +    if (timerState1.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        leftStreamWinSize,
    +        row1MapState,
    +        timerState1,
    +        ctx
    +      )
    +    }
    +
    +    if (timerState2.value == timestamp) {
    +      expireOutTimeRow(
    +        timestamp,
    +        rightStreamWinSize,
    +        row2MapState,
    +        timerState2,
    +        ctx
    +      )
    +    }
    +  }
    +
    +  /**
    +    * Puts an element from the input stream into state and search the other state to
    +    * output records meet the condition, and registers a timer for the current record
    +    * if there is no timer at present.
    +    */
    +  private def processElement(
    +      valueC: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow],
    +      winSize: Long,
    +      timerState: ValueState[Long],
    +      rowMapState: MapState[Long, JList[Row]],
    +      oppoRowMapState: MapState[Long, JList[Row]],
    +      oppoLowerBound: Long,
    +      oppoUpperBound: Long,
    +      isLeft: Boolean): Unit = {
    +
    +    cRowWrapper.out = out
    +
    +    val value = valueC.row
    +
    +    val curProcessTime = ctx.timerService.currentProcessingTime
    +    val oppoLowerTime = curProcessTime + oppoLowerBound
    +    val oppoUpperTime = curProcessTime + oppoUpperBound
    +
    +    // only when windowsize != 0, we need to store the element
    +    if (winSize != 0) {
    --- End diff --
    
    I'm not sure about this.  For example, `a.proctime between b.proctime - 5 and b.proctime`. In this case, we will buffer stream `a` for a window size 5, but will not buffer stream `b` because the right window size is 0. 
    
    Suppose the input elements are  [a1, 1], [a2, 2], [b1, 5], [a3, 5]. The first field in the tuple indicates which stream it belongs to. The seconds field in the tuple is the processing timestamp. The expected result should be `a1, b1`, `a2, b1`, `a3, b1`. But the actual result misses `a3, b1`. Because we didn't buffer the elements  from `b` stream. 
    
    So I think, even if the window size is 0, we still need to store the elements. Of course, we will register a `curTime +1` timer to clean the states.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---