You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by hequn8128 <gi...@git.apache.org> on 2017/04/07 10:32:27 UTC

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

GitHub user hequn8128 opened a pull request:

    https://github.com/apache/flink/pull/3696

    [FLINK-6090] [table] Add RetractionRule at the stage of decoration

    Add RetractionRules at the stage of decoration. These rules can derive NeedRetraction property and accumulating mode. There are three rules:
    1.InitProcessRule. This rule inits NeedRetraction property and AccMode for DatastreamRels.
    2.NeedToRetractProcessRule. This rule derives NeedRetraction property.
    3.AccModeProcessRule.Find all AccRetract nodes. This rule derives accumulating mode.
    
    - [x] General
      - The pull request references the related JIRA issue  [FLINK-6090] Add RetractionRule at the stage of decoration
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/hequn8128/flink FLINK-6090

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3696.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3696
    
----
commit c4edd908e2a251934ab6992dd1826a58a4d10b65
Author: hequn.chq <he...@alibaba-inc.com>
Date:   2017-04-07T05:12:04Z

    [FLINK-6090] [table] Add RetractionRule at the stage of decoration

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3696: [FLINK-6090] [table] Add RetractionRule at the stage of d...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3696
  
    Hi @hequn8128 and @shaoxuan-wang, I merged this PR to the `table-retraction` branch.
    Could you close it? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110390018
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +      val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    +
    +      // get bottom relnodes
    +      val bottomRels = getChildRelNodes(topRel)
    +
    +      // process bottom relnodes
    +      val (newTopRel, needTransform) = needToRetractProcess(topRel, bottomRels)
    +
    +      if (needTransform) {
    +        call.transformTo(newTopRel)
    +      }
    +    }
    +
    +  }
    +
    +
    +  /**
    +    * Find all AccRetract nodes. A node in AccRetract Mode means that the operator may generate
    +    * or forward an additional retraction message.
    +    *
    +    */
    +  class AccModeProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "AccModeProcessRule") {
    +
    +
    +    /**
    +      * Return true if result table is a replace table
    +      */
    +    def resultTableIsReplaceTable(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +
    +    def traitSetContainAccRetract(traitSet: RelTraitSet): Boolean = {
    +      val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        false
    +      } else {
    +        retractionTrait.getAccMode == AccMode.AccRetract
    +      }
    +    }
    +
    +    /**
    +      * Return true if the result table of input RelNode is a replace table and the input RelNode
    +      * is under AccMode with needToRetract.
    +      */
    +    def needSetAccRetract(relNode: RelNode): Boolean = {
    +      val traitSet = relNode.getTraitSet
    +      if (traitSetContainNeedToRetract(traitSet)
    +        && resultTableIsReplaceTable(relNode)
    +        && !traitSetContainAccRetract(traitSet)) {
    +        true
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Set AccMode to AccRetract for the input RelNode
    +      */
    +    def setAccRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(false, AccMode.AccRetract)
    +      } else {
    +        retractionTrait = new RetractionTrait(retractionTrait.getNeedToRetract,
    +                                              AccMode.AccRetract)
    +      }
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Currently, window (including group window and over window) does not contain early firing,
    +      * so [[DataStreamGroupWindowAggregate]] and [[DataStreamOverAggregate]] will digest
    +      * retraction messages. [[DataStreamGroupAggregate]] can also digest retraction because
    +      * it digests retraction first then generate new retraction messages by itself.
    +      */
    +    def canDigestRetraction(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _: DataStreamGroupWindowAggregate => true
    +        case _: DataStreamOverAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +    /**
    +      * Return true if topRel needs to work under AccRetract
    +      */
    +    def topRelNeedSetAccRetract(topRel: RelNode, bottomRels: ListBuffer[RelNode]): Boolean = {
    +      var needSet: Boolean = false
    +      var bottomRelsUnderAccRetract: Boolean = false
    +      val traitSet = topRel.getTraitSet
    +
    +      // check if top RelNode needs to work under AccRetract by itself
    +      needSet = needSetAccRetract(topRel)
    +
    +      // check if there is a bottom RelNode working under AccRetract
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        if (traitSetContainAccRetract(bottomRels(i).getTraitSet)) {
    +          bottomRelsUnderAccRetract = true
    +        }
    +        i += 1
    +      }
    +
    +      // Return true
    +      // if topRel needs to work under AccRetract by itself
    +      // or bottom relNodes make topRel to work under AccRetract
    +      needSet ||
    +        (bottomRelsUnderAccRetract
    +          && !canDigestRetraction(topRel)
    +          && !traitSetContainAccRetract(traitSet))
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with AccMode properly setted. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def accModeProcess(topRel: RelNode, bottomRels: ListBuffer[RelNode]): (RelNode, Boolean) = {
    +      var needTransform = false
    +      var i = 0
    +      var newTopRel = topRel
    +
    +      // process bottom RelNodes
    +      while (i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(needSetAccRetract(bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = setAccRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +
    +      // process top RelNode
    +      if (topRelNeedSetAccRetract(topRel, bottomRels)) {
    +        needTransform = true
    +        newTopRel = setAccRetract(topRel)
    +      }
    +
    +      newTopRel = newTopRel.copy(newTopRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    +
    +      // get bottom relnodes
    +      val bottomRels = getChildRelNodes(topRel)
    +
    +      // process bottom and top relnodes
    +      val (newTopRel, needTransform) = accModeProcess(topRel, bottomRels)
    +
    +      if (needTransform) {
    +        call.transformTo(newTopRel)
    +      }
    +    }
    +  }
    +
    +
    +  /**
    +    * Rule that init retraction trait inside a [[DataStreamRel]]. If a [[DataStreamRel]] does not
    +    * contain retraction trait, initialize one for the [[DataStreamRel]]. After
    +    * initiallization, needToRetract will be set to false and AccMode will be set to Acc.
    --- End diff --
    
    "initiallization" -> "initialization"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110392776
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    --- End diff --
    
    Scala discourages the use of mutable collections. Hence, I think we should not modify the parameter list here. Instead, we should create a new list and create the new top rel from that. For example like this:
    
    ```
    val transformedBottom = for (b <- bottomRels) yield {
      if (bottomNeedToRetract(topRel, b)) {
        needTransform = true
        addNeedToRetract(b)
      } else {
        b
      }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110393126
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +      val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava)
    --- End diff --
    
    we only need to create the newTopRel if `needTransform == true`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110388706
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    --- End diff --
    
    Change return type to `Seq[RelNode]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r111320841
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala ---
    @@ -36,14 +36,14 @@ import org.apache.flink.types.Row
     class DataStreamCorrelate(
         cluster: RelOptCluster,
         traitSet: RelTraitSet,
    -    inputNode: RelNode,
    --- End diff --
    
    hi, sorry for missing any commons about this fix. After decoration phase, the class type of inputNode is `HepRelVertex` and it will throws ClassCastException at `val inputDS = inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)`. You can reproduce this exception by running tests in `DataStreamUserDefinedFunctionITCase`. The reason why there is no problems after runVolcanoPlanner is that `DataStreamCorrelateRule` does the transformation from `RelSubset` to `DataStreamRel`. I think it's better to override the `input`  parameter and use `getInput` when translate to plan. What do you think, thx~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r111244797
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    --- End diff --
    
    can be simplified to 
    `null != retraction && retractionTrait.getNeedToRetract`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by godfreyhe <gi...@git.apache.org>.
Github user godfreyhe commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110534557
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    --- End diff --
    
    topRel.getInputs.asScala.map (
     case e: HepRelVertex => e.getCurrentRel
    )



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r111804596
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala ---
    @@ -36,14 +36,14 @@ import org.apache.flink.types.Row
     class DataStreamCorrelate(
         cluster: RelOptCluster,
         traitSet: RelTraitSet,
    -    inputNode: RelNode,
    --- End diff --
    
    Ah, OK. Thanks for the explanation!
    That makes sense :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110389978
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +      val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    +
    +      // get bottom relnodes
    +      val bottomRels = getChildRelNodes(topRel)
    +
    +      // process bottom relnodes
    +      val (newTopRel, needTransform) = needToRetractProcess(topRel, bottomRels)
    +
    +      if (needTransform) {
    +        call.transformTo(newTopRel)
    +      }
    +    }
    +
    +  }
    +
    +
    +  /**
    +    * Find all AccRetract nodes. A node in AccRetract Mode means that the operator may generate
    +    * or forward an additional retraction message.
    +    *
    +    */
    +  class AccModeProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "AccModeProcessRule") {
    +
    +
    +    /**
    +      * Return true if result table is a replace table
    +      */
    +    def resultTableIsReplaceTable(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +
    +    def traitSetContainAccRetract(traitSet: RelTraitSet): Boolean = {
    +      val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        false
    +      } else {
    +        retractionTrait.getAccMode == AccMode.AccRetract
    +      }
    +    }
    +
    +    /**
    +      * Return true if the result table of input RelNode is a replace table and the input RelNode
    +      * is under AccMode with needToRetract.
    +      */
    +    def needSetAccRetract(relNode: RelNode): Boolean = {
    +      val traitSet = relNode.getTraitSet
    +      if (traitSetContainNeedToRetract(traitSet)
    +        && resultTableIsReplaceTable(relNode)
    +        && !traitSetContainAccRetract(traitSet)) {
    +        true
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Set AccMode to AccRetract for the input RelNode
    +      */
    +    def setAccRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(false, AccMode.AccRetract)
    +      } else {
    +        retractionTrait = new RetractionTrait(retractionTrait.getNeedToRetract,
    +                                              AccMode.AccRetract)
    +      }
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Currently, window (including group window and over window) does not contain early firing,
    +      * so [[DataStreamGroupWindowAggregate]] and [[DataStreamOverAggregate]] will digest
    +      * retraction messages. [[DataStreamGroupAggregate]] can also digest retraction because
    +      * it digests retraction first then generate new retraction messages by itself.
    +      */
    +    def canDigestRetraction(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _: DataStreamGroupWindowAggregate => true
    +        case _: DataStreamOverAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +    /**
    +      * Return true if topRel needs to work under AccRetract
    +      */
    +    def topRelNeedSetAccRetract(topRel: RelNode, bottomRels: ListBuffer[RelNode]): Boolean = {
    +      var needSet: Boolean = false
    +      var bottomRelsUnderAccRetract: Boolean = false
    +      val traitSet = topRel.getTraitSet
    +
    +      // check if top RelNode needs to work under AccRetract by itself
    +      needSet = needSetAccRetract(topRel)
    +
    +      // check if there is a bottom RelNode working under AccRetract
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        if (traitSetContainAccRetract(bottomRels(i).getTraitSet)) {
    +          bottomRelsUnderAccRetract = true
    +        }
    +        i += 1
    +      }
    +
    +      // Return true
    +      // if topRel needs to work under AccRetract by itself
    +      // or bottom relNodes make topRel to work under AccRetract
    +      needSet ||
    +        (bottomRelsUnderAccRetract
    +          && !canDigestRetraction(topRel)
    +          && !traitSetContainAccRetract(traitSet))
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with AccMode properly setted. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def accModeProcess(topRel: RelNode, bottomRels: ListBuffer[RelNode]): (RelNode, Boolean) = {
    +      var needTransform = false
    +      var i = 0
    +      var newTopRel = topRel
    +
    +      // process bottom RelNodes
    +      while (i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(needSetAccRetract(bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = setAccRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +
    +      // process top RelNode
    +      if (topRelNeedSetAccRetract(topRel, bottomRels)) {
    +        needTransform = true
    +        newTopRel = setAccRetract(topRel)
    +      }
    +
    +      newTopRel = newTopRel.copy(newTopRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    +
    +      // get bottom relnodes
    +      val bottomRels = getChildRelNodes(topRel)
    +
    +      // process bottom and top relnodes
    +      val (newTopRel, needTransform) = accModeProcess(topRel, bottomRels)
    +
    +      if (needTransform) {
    +        call.transformTo(newTopRel)
    +      }
    +    }
    +  }
    +
    +
    +  /**
    +    * Rule that init retraction trait inside a [[DataStreamRel]]. If a [[DataStreamRel]] does not
    +    * contain retraction trait, initialize one for the [[DataStreamRel]]. After
    +    * initiallization, needToRetract will be set to false and AccMode will be set to Acc.
    +    */
    +  class InitProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "InitProcessRule") {
    --- End diff --
    
    Rename to "InitRectractionRule"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r111245186
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    --- End diff --
    
    I think it would be better to split `RetractionTrait` into two traits. As far as I can tell, the `needToRetract` property is only need to to compute the correct `AccMode`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3696: [FLINK-6090] [table] Add RetractionRule at the stage of d...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on the issue:

    https://github.com/apache/flink/pull/3696
  
    @fhueske, thanks for the review and valuable comments.  
    Yes, we'd better add the attributes (which provide information for deriving ACCMode) inside the DataStreamRel interface, such that the code will be much clean and easy to be understood.
    
    Thanks,
    Shaoxuan


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110398565
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +      val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    +
    +      // get bottom relnodes
    +      val bottomRels = getChildRelNodes(topRel)
    +
    +      // process bottom relnodes
    +      val (newTopRel, needTransform) = needToRetractProcess(topRel, bottomRels)
    +
    +      if (needTransform) {
    +        call.transformTo(newTopRel)
    +      }
    +    }
    +
    +  }
    +
    +
    +  /**
    +    * Find all AccRetract nodes. A node in AccRetract Mode means that the operator may generate
    +    * or forward an additional retraction message.
    +    *
    +    */
    +  class AccModeProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "AccModeProcessRule") {
    +
    +
    +    /**
    +      * Return true if result table is a replace table
    +      */
    +    def resultTableIsReplaceTable(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +
    +    def traitSetContainAccRetract(traitSet: RelTraitSet): Boolean = {
    +      val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        false
    +      } else {
    +        retractionTrait.getAccMode == AccMode.AccRetract
    +      }
    +    }
    +
    +    /**
    +      * Return true if the result table of input RelNode is a replace table and the input RelNode
    +      * is under AccMode with needToRetract.
    +      */
    +    def needSetAccRetract(relNode: RelNode): Boolean = {
    +      val traitSet = relNode.getTraitSet
    +      if (traitSetContainNeedToRetract(traitSet)
    +        && resultTableIsReplaceTable(relNode)
    +        && !traitSetContainAccRetract(traitSet)) {
    +        true
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Set AccMode to AccRetract for the input RelNode
    +      */
    +    def setAccRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(false, AccMode.AccRetract)
    +      } else {
    +        retractionTrait = new RetractionTrait(retractionTrait.getNeedToRetract,
    +                                              AccMode.AccRetract)
    +      }
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Currently, window (including group window and over window) does not contain early firing,
    +      * so [[DataStreamGroupWindowAggregate]] and [[DataStreamOverAggregate]] will digest
    +      * retraction messages. [[DataStreamGroupAggregate]] can also digest retraction because
    +      * it digests retraction first then generate new retraction messages by itself.
    +      */
    +    def canDigestRetraction(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _: DataStreamGroupWindowAggregate => true
    +        case _: DataStreamOverAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +    /**
    +      * Return true if topRel needs to work under AccRetract
    +      */
    +    def topRelNeedSetAccRetract(topRel: RelNode, bottomRels: ListBuffer[RelNode]): Boolean = {
    +      var needSet: Boolean = false
    +      var bottomRelsUnderAccRetract: Boolean = false
    +      val traitSet = topRel.getTraitSet
    +
    +      // check if top RelNode needs to work under AccRetract by itself
    +      needSet = needSetAccRetract(topRel)
    +
    +      // check if there is a bottom RelNode working under AccRetract
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        if (traitSetContainAccRetract(bottomRels(i).getTraitSet)) {
    +          bottomRelsUnderAccRetract = true
    +        }
    +        i += 1
    +      }
    +
    +      // Return true
    +      // if topRel needs to work under AccRetract by itself
    +      // or bottom relNodes make topRel to work under AccRetract
    +      needSet ||
    +        (bottomRelsUnderAccRetract
    +          && !canDigestRetraction(topRel)
    +          && !traitSetContainAccRetract(traitSet))
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with AccMode properly setted. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def accModeProcess(topRel: RelNode, bottomRels: ListBuffer[RelNode]): (RelNode, Boolean) = {
    +      var needTransform = false
    +      var i = 0
    +      var newTopRel = topRel
    +
    +      // process bottom RelNodes
    +      while (i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(needSetAccRetract(bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = setAccRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +
    +      // process top RelNode
    +      if (topRelNeedSetAccRetract(topRel, bottomRels)) {
    +        needTransform = true
    +        newTopRel = setAccRetract(topRel)
    +      }
    +
    +      newTopRel = newTopRel.copy(newTopRel.getTraitSet, bottomRels.asJava)
    --- End diff --
    
    We only need to create this if `needTransform` is `true`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110403637
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    --- End diff --
    
    +space `if (`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r111247042
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/RetractionTrait.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef}
    +import org.apache.flink.table.plan.nodes.datastream.AccMode.AccMode
    +
    +/**
    +  * Used to store retraction related properties which is used during rule optimization.
    +  */
    +class RetractionTrait extends RelTrait {
    +
    +  /**
    +    * Defines whether downstream operator need retraction. Please note that needToRetract is
    +    * different from needRetraction. NeedToRetract is a property particular for each operator,
    +    * while NeedRetraction is a property for each input. Most of operators have only one input,
    +    * some operators may have more than one inputs (e.g., join, union), and the property of the
    +    * NeedRetraction could be different across different inputs of the same operator
    +    */
    +  private var needToRetract: Boolean = false
    +
    +  /**
    +    * Defines the accumulating mode for a operator. Basically there are two modes for each
    +    * operator: Accumulating Mode (Acc) and Accumulating and Retracting Mode (AccRetract).
    +    */
    +  private var accMode = AccMode.Acc
    +
    +
    +  def this(needToRetract: Boolean, accMode: AccMode) {
    --- End diff --
    
    I would split this trait into two traits. Unless I'm wrong, we are eventually only interested in the `AccMode` and `needToRetract` is just temporarily needed to compute the correct `AccMode`. Hence, I would split this into two traits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110376285
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala ---
    @@ -36,14 +36,14 @@ import org.apache.flink.types.Row
     class DataStreamCorrelate(
         cluster: RelOptCluster,
         traitSet: RelTraitSet,
    -    inputNode: RelNode,
    --- End diff --
    
    Please revert these changes. We try to avoid reformatting changes because they make PRs harder to review. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r111246143
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    --- End diff --
    
    I wonder whether it would be better to extend `DataStreamRel` with a couple of methods that expose the retraction behavior of each operator (for example `requiresUpdatesAsRetractions()`, `forwardsRetractions()`, `consumesRetractions()`, `producesRetractions()`, or `producesUpdates()`).
    
    I think hardcoding the operator classes here is not a good solution, because it might easily break if the rules are not correctly updated when the operators change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110388774
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    --- End diff --
    
    method can be simplified to `topRel.getInputs.asScala.transform(_.asInstanceOf[HepRelVertex].getCurrentRel)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110392501
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    --- End diff --
    
    +space: `while (`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110401510
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +      val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    +
    +      // get bottom relnodes
    +      val bottomRels = getChildRelNodes(topRel)
    +
    +      // process bottom relnodes
    +      val (newTopRel, needTransform) = needToRetractProcess(topRel, bottomRels)
    +
    +      if (needTransform) {
    +        call.transformTo(newTopRel)
    +      }
    +    }
    +
    +  }
    +
    +
    +  /**
    +    * Find all AccRetract nodes. A node in AccRetract Mode means that the operator may generate
    +    * or forward an additional retraction message.
    +    *
    +    */
    +  class AccModeProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "AccModeProcessRule") {
    +
    +
    +    /**
    +      * Return true if result table is a replace table
    +      */
    +    def resultTableIsReplaceTable(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +
    +    def traitSetContainAccRetract(traitSet: RelTraitSet): Boolean = {
    +      val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        false
    +      } else {
    +        retractionTrait.getAccMode == AccMode.AccRetract
    +      }
    +    }
    +
    +    /**
    +      * Return true if the result table of input RelNode is a replace table and the input RelNode
    +      * is under AccMode with needToRetract.
    +      */
    +    def needSetAccRetract(relNode: RelNode): Boolean = {
    +      val traitSet = relNode.getTraitSet
    +      if (traitSetContainNeedToRetract(traitSet)
    +        && resultTableIsReplaceTable(relNode)
    +        && !traitSetContainAccRetract(traitSet)) {
    +        true
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Set AccMode to AccRetract for the input RelNode
    +      */
    +    def setAccRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(false, AccMode.AccRetract)
    +      } else {
    +        retractionTrait = new RetractionTrait(retractionTrait.getNeedToRetract,
    +                                              AccMode.AccRetract)
    +      }
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Currently, window (including group window and over window) does not contain early firing,
    +      * so [[DataStreamGroupWindowAggregate]] and [[DataStreamOverAggregate]] will digest
    +      * retraction messages. [[DataStreamGroupAggregate]] can also digest retraction because
    +      * it digests retraction first then generate new retraction messages by itself.
    +      */
    +    def canDigestRetraction(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _: DataStreamGroupWindowAggregate => true
    +        case _: DataStreamOverAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +    /**
    +      * Return true if topRel needs to work under AccRetract
    +      */
    +    def topRelNeedSetAccRetract(topRel: RelNode, bottomRels: ListBuffer[RelNode]): Boolean = {
    +      var needSet: Boolean = false
    +      var bottomRelsUnderAccRetract: Boolean = false
    +      val traitSet = topRel.getTraitSet
    +
    +      // check if top RelNode needs to work under AccRetract by itself
    +      needSet = needSetAccRetract(topRel)
    --- End diff --
    
    ```
    val topNeedsAccRetract = needSetAccRetract(topRel)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r111246693
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +      val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    --- End diff --
    
    usually `top` denotes the top node of a tree and `bottom` the leave nodes of a tree. I think `parent` and `children` / `child` would be more appropriate terms here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110399631
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +      val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    +
    +      // get bottom relnodes
    +      val bottomRels = getChildRelNodes(topRel)
    +
    +      // process bottom relnodes
    +      val (newTopRel, needTransform) = needToRetractProcess(topRel, bottomRels)
    +
    +      if (needTransform) {
    +        call.transformTo(newTopRel)
    +      }
    +    }
    +
    +  }
    +
    +
    +  /**
    +    * Find all AccRetract nodes. A node in AccRetract Mode means that the operator may generate
    +    * or forward an additional retraction message.
    +    *
    +    */
    +  class AccModeProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "AccModeProcessRule") {
    +
    +
    +    /**
    +      * Return true if result table is a replace table
    +      */
    +    def resultTableIsReplaceTable(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +
    +    def traitSetContainAccRetract(traitSet: RelTraitSet): Boolean = {
    +      val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        false
    +      } else {
    +        retractionTrait.getAccMode == AccMode.AccRetract
    +      }
    +    }
    +
    +    /**
    +      * Return true if the result table of input RelNode is a replace table and the input RelNode
    +      * is under AccMode with needToRetract.
    +      */
    +    def needSetAccRetract(relNode: RelNode): Boolean = {
    +      val traitSet = relNode.getTraitSet
    +      if (traitSetContainNeedToRetract(traitSet)
    +        && resultTableIsReplaceTable(relNode)
    +        && !traitSetContainAccRetract(traitSet)) {
    +        true
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Set AccMode to AccRetract for the input RelNode
    +      */
    +    def setAccRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(false, AccMode.AccRetract)
    +      } else {
    +        retractionTrait = new RetractionTrait(retractionTrait.getNeedToRetract,
    +                                              AccMode.AccRetract)
    +      }
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Currently, window (including group window and over window) does not contain early firing,
    +      * so [[DataStreamGroupWindowAggregate]] and [[DataStreamOverAggregate]] will digest
    +      * retraction messages. [[DataStreamGroupAggregate]] can also digest retraction because
    +      * it digests retraction first then generate new retraction messages by itself.
    +      */
    +    def canDigestRetraction(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    --- End diff --
    
    make this a property of the operator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110392466
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    --- End diff --
    
    +space: `if (`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110390397
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +      val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    +
    +      // get bottom relnodes
    +      val bottomRels = getChildRelNodes(topRel)
    +
    +      // process bottom relnodes
    +      val (newTopRel, needTransform) = needToRetractProcess(topRel, bottomRels)
    +
    +      if (needTransform) {
    +        call.transformTo(newTopRel)
    +      }
    +    }
    +
    +  }
    +
    +
    +  /**
    +    * Find all AccRetract nodes. A node in AccRetract Mode means that the operator may generate
    +    * or forward an additional retraction message.
    +    *
    +    */
    +  class AccModeProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "AccModeProcessRule") {
    +
    +
    +    /**
    +      * Return true if result table is a replace table
    +      */
    +    def resultTableIsReplaceTable(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +
    +    def traitSetContainAccRetract(traitSet: RelTraitSet): Boolean = {
    +      val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        false
    +      } else {
    +        retractionTrait.getAccMode == AccMode.AccRetract
    +      }
    +    }
    +
    +    /**
    +      * Return true if the result table of input RelNode is a replace table and the input RelNode
    +      * is under AccMode with needToRetract.
    +      */
    +    def needSetAccRetract(relNode: RelNode): Boolean = {
    +      val traitSet = relNode.getTraitSet
    +      if (traitSetContainNeedToRetract(traitSet)
    +        && resultTableIsReplaceTable(relNode)
    +        && !traitSetContainAccRetract(traitSet)) {
    +        true
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Set AccMode to AccRetract for the input RelNode
    +      */
    +    def setAccRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(false, AccMode.AccRetract)
    +      } else {
    +        retractionTrait = new RetractionTrait(retractionTrait.getNeedToRetract,
    +                                              AccMode.AccRetract)
    +      }
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Currently, window (including group window and over window) does not contain early firing,
    +      * so [[DataStreamGroupWindowAggregate]] and [[DataStreamOverAggregate]] will digest
    +      * retraction messages. [[DataStreamGroupAggregate]] can also digest retraction because
    +      * it digests retraction first then generate new retraction messages by itself.
    +      */
    +    def canDigestRetraction(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _: DataStreamGroupWindowAggregate => true
    +        case _: DataStreamOverAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +    /**
    +      * Return true if topRel needs to work under AccRetract
    +      */
    +    def topRelNeedSetAccRetract(topRel: RelNode, bottomRels: ListBuffer[RelNode]): Boolean = {
    +      var needSet: Boolean = false
    +      var bottomRelsUnderAccRetract: Boolean = false
    +      val traitSet = topRel.getTraitSet
    +
    +      // check if top RelNode needs to work under AccRetract by itself
    +      needSet = needSetAccRetract(topRel)
    +
    +      // check if there is a bottom RelNode working under AccRetract
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        if (traitSetContainAccRetract(bottomRels(i).getTraitSet)) {
    +          bottomRelsUnderAccRetract = true
    +        }
    +        i += 1
    +      }
    +
    +      // Return true
    +      // if topRel needs to work under AccRetract by itself
    +      // or bottom relNodes make topRel to work under AccRetract
    +      needSet ||
    +        (bottomRelsUnderAccRetract
    +          && !canDigestRetraction(topRel)
    +          && !traitSetContainAccRetract(traitSet))
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with AccMode properly setted. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def accModeProcess(topRel: RelNode, bottomRels: ListBuffer[RelNode]): (RelNode, Boolean) = {
    +      var needTransform = false
    +      var i = 0
    +      var newTopRel = topRel
    +
    +      // process bottom RelNodes
    +      while (i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(needSetAccRetract(bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = setAccRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +
    +      // process top RelNode
    +      if (topRelNeedSetAccRetract(topRel, bottomRels)) {
    +        needTransform = true
    +        newTopRel = setAccRetract(topRel)
    +      }
    +
    +      newTopRel = newTopRel.copy(newTopRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    +
    +      // get bottom relnodes
    +      val bottomRels = getChildRelNodes(topRel)
    +
    +      // process bottom and top relnodes
    +      val (newTopRel, needTransform) = accModeProcess(topRel, bottomRels)
    +
    +      if (needTransform) {
    +        call.transformTo(newTopRel)
    +      }
    +    }
    +  }
    +
    +
    +  /**
    +    * Rule that init retraction trait inside a [[DataStreamRel]]. If a [[DataStreamRel]] does not
    +    * contain retraction trait, initialize one for the [[DataStreamRel]]. After
    +    * initiallization, needToRetract will be set to false and AccMode will be set to Acc.
    +    */
    +  class InitProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "InitProcessRule") {
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val rel = call.rel(0).asInstanceOf[DataStreamRel]
    +      var retractionTrait = rel.getTraitSet.getTrait(RetractionTraitDef.INSTANCE)
    --- End diff --
    
    Simplify to 
    ```
    if (!rel.getTraitSet.contains(RetractionTraitDef.INSTANCE)) {
      val retractionTrait = new RetractionTrait(false, AccMode.Acc)
      call.transformTo(rel.copy(rel.getTraitSet.plus(retractionTrait), rel.getInputs))
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r111372529
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    --- End diff --
    
    hi, you are right. I have extended `DataStreamRel` with three methods:  `needsUpdatesAsRetraction()`, `producesUpdates()` and `consumesRetractions()` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110401135
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +      val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    +
    +      // get bottom relnodes
    +      val bottomRels = getChildRelNodes(topRel)
    +
    +      // process bottom relnodes
    +      val (newTopRel, needTransform) = needToRetractProcess(topRel, bottomRels)
    +
    +      if (needTransform) {
    +        call.transformTo(newTopRel)
    +      }
    +    }
    +
    +  }
    +
    +
    +  /**
    +    * Find all AccRetract nodes. A node in AccRetract Mode means that the operator may generate
    +    * or forward an additional retraction message.
    +    *
    +    */
    +  class AccModeProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "AccModeProcessRule") {
    +
    +
    +    /**
    +      * Return true if result table is a replace table
    +      */
    +    def resultTableIsReplaceTable(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +
    +    def traitSetContainAccRetract(traitSet: RelTraitSet): Boolean = {
    +      val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        false
    +      } else {
    +        retractionTrait.getAccMode == AccMode.AccRetract
    +      }
    +    }
    +
    +    /**
    +      * Return true if the result table of input RelNode is a replace table and the input RelNode
    +      * is under AccMode with needToRetract.
    +      */
    +    def needSetAccRetract(relNode: RelNode): Boolean = {
    +      val traitSet = relNode.getTraitSet
    +      if (traitSetContainNeedToRetract(traitSet)
    +        && resultTableIsReplaceTable(relNode)
    +        && !traitSetContainAccRetract(traitSet)) {
    +        true
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Set AccMode to AccRetract for the input RelNode
    +      */
    +    def setAccRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(false, AccMode.AccRetract)
    +      } else {
    +        retractionTrait = new RetractionTrait(retractionTrait.getNeedToRetract,
    +                                              AccMode.AccRetract)
    +      }
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Currently, window (including group window and over window) does not contain early firing,
    +      * so [[DataStreamGroupWindowAggregate]] and [[DataStreamOverAggregate]] will digest
    +      * retraction messages. [[DataStreamGroupAggregate]] can also digest retraction because
    +      * it digests retraction first then generate new retraction messages by itself.
    +      */
    +    def canDigestRetraction(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _: DataStreamGroupWindowAggregate => true
    +        case _: DataStreamOverAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +    /**
    +      * Return true if topRel needs to work under AccRetract
    +      */
    +    def topRelNeedSetAccRetract(topRel: RelNode, bottomRels: ListBuffer[RelNode]): Boolean = {
    +      var needSet: Boolean = false
    +      var bottomRelsUnderAccRetract: Boolean = false
    +      val traitSet = topRel.getTraitSet
    +
    +      // check if top RelNode needs to work under AccRetract by itself
    +      needSet = needSetAccRetract(topRel)
    +
    +      // check if there is a bottom RelNode working under AccRetract
    +      var i = 0
    +      while(i < bottomRels.size) {
    --- End diff --
    
    Simplify to
    ```
    val isBottomAccRetract = bottomRels.exists(b => traitSetContainAccRetract(b.getTraitSet))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110401619
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +      val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    +
    +      // get bottom relnodes
    +      val bottomRels = getChildRelNodes(topRel)
    +
    +      // process bottom relnodes
    +      val (newTopRel, needTransform) = needToRetractProcess(topRel, bottomRels)
    +
    +      if (needTransform) {
    +        call.transformTo(newTopRel)
    +      }
    +    }
    +
    +  }
    +
    +
    +  /**
    +    * Find all AccRetract nodes. A node in AccRetract Mode means that the operator may generate
    +    * or forward an additional retraction message.
    +    *
    +    */
    +  class AccModeProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "AccModeProcessRule") {
    +
    +
    +    /**
    +      * Return true if result table is a replace table
    +      */
    +    def resultTableIsReplaceTable(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +
    +    def traitSetContainAccRetract(traitSet: RelTraitSet): Boolean = {
    +      val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        false
    +      } else {
    +        retractionTrait.getAccMode == AccMode.AccRetract
    +      }
    +    }
    +
    +    /**
    +      * Return true if the result table of input RelNode is a replace table and the input RelNode
    +      * is under AccMode with needToRetract.
    +      */
    +    def needSetAccRetract(relNode: RelNode): Boolean = {
    +      val traitSet = relNode.getTraitSet
    +      if (traitSetContainNeedToRetract(traitSet)
    +        && resultTableIsReplaceTable(relNode)
    +        && !traitSetContainAccRetract(traitSet)) {
    +        true
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Set AccMode to AccRetract for the input RelNode
    +      */
    +    def setAccRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(false, AccMode.AccRetract)
    +      } else {
    +        retractionTrait = new RetractionTrait(retractionTrait.getNeedToRetract,
    +                                              AccMode.AccRetract)
    +      }
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Currently, window (including group window and over window) does not contain early firing,
    +      * so [[DataStreamGroupWindowAggregate]] and [[DataStreamOverAggregate]] will digest
    +      * retraction messages. [[DataStreamGroupAggregate]] can also digest retraction because
    +      * it digests retraction first then generate new retraction messages by itself.
    +      */
    +    def canDigestRetraction(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _: DataStreamGroupWindowAggregate => true
    +        case _: DataStreamOverAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +    /**
    +      * Return true if topRel needs to work under AccRetract
    +      */
    +    def topRelNeedSetAccRetract(topRel: RelNode, bottomRels: ListBuffer[RelNode]): Boolean = {
    +      var needSet: Boolean = false
    --- End diff --
    
    It is good practice in Scala to avoid `var`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by godfreyhe <gi...@git.apache.org>.
Github user godfreyhe commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110534493
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    --- End diff --
    
    case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    case _ => false
    
    =>
    
    case _ -> traitSetContainNeedToRetract(topRel.getTraitSet) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110391516
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +      val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    --- End diff --
    
    I would order the methods in the class in their call order, i.e., `onMatch()` first, `needToRetractProcess()` second, ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3696: [FLINK-6090] [table] Add RetractionRule at the stage of d...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on the issue:

    https://github.com/apache/flink/pull/3696
  
    hi @fhueske , Thanks for your review and refactorings. I think it's pretty good and I have learned a lot form it. I left a few comments in your PR.
    
    As for the hardcode problem, I think you are right, it's better to extend the DataStreamRel with a couple of methods. I have added some methods in DataStreamRel, so you can use them directly when you squash your PR into my commit, thx~
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 closed the pull request at:

    https://github.com/apache/flink/pull/3696


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3696: [FLINK-6090] [table] Add RetractionRule at the sta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110398315
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +      val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    +
    +      // get bottom relnodes
    +      val bottomRels = getChildRelNodes(topRel)
    +
    +      // process bottom relnodes
    +      val (newTopRel, needTransform) = needToRetractProcess(topRel, bottomRels)
    +
    +      if (needTransform) {
    +        call.transformTo(newTopRel)
    +      }
    +    }
    +
    +  }
    +
    +
    +  /**
    +    * Find all AccRetract nodes. A node in AccRetract Mode means that the operator may generate
    +    * or forward an additional retraction message.
    +    *
    +    */
    +  class AccModeProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "AccModeProcessRule") {
    +
    +
    +    /**
    +      * Return true if result table is a replace table
    +      */
    +    def resultTableIsReplaceTable(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +
    +    def traitSetContainAccRetract(traitSet: RelTraitSet): Boolean = {
    +      val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        false
    +      } else {
    +        retractionTrait.getAccMode == AccMode.AccRetract
    +      }
    +    }
    +
    +    /**
    +      * Return true if the result table of input RelNode is a replace table and the input RelNode
    +      * is under AccMode with needToRetract.
    +      */
    +    def needSetAccRetract(relNode: RelNode): Boolean = {
    +      val traitSet = relNode.getTraitSet
    +      if (traitSetContainNeedToRetract(traitSet)
    +        && resultTableIsReplaceTable(relNode)
    +        && !traitSetContainAccRetract(traitSet)) {
    +        true
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Set AccMode to AccRetract for the input RelNode
    +      */
    +    def setAccRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(false, AccMode.AccRetract)
    +      } else {
    +        retractionTrait = new RetractionTrait(retractionTrait.getNeedToRetract,
    +                                              AccMode.AccRetract)
    +      }
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Currently, window (including group window and over window) does not contain early firing,
    +      * so [[DataStreamGroupWindowAggregate]] and [[DataStreamOverAggregate]] will digest
    +      * retraction messages. [[DataStreamGroupAggregate]] can also digest retraction because
    +      * it digests retraction first then generate new retraction messages by itself.
    +      */
    +    def canDigestRetraction(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _: DataStreamGroupWindowAggregate => true
    +        case _: DataStreamOverAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +    /**
    +      * Return true if topRel needs to work under AccRetract
    +      */
    +    def topRelNeedSetAccRetract(topRel: RelNode, bottomRels: ListBuffer[RelNode]): Boolean = {
    +      var needSet: Boolean = false
    +      var bottomRelsUnderAccRetract: Boolean = false
    +      val traitSet = topRel.getTraitSet
    +
    +      // check if top RelNode needs to work under AccRetract by itself
    +      needSet = needSetAccRetract(topRel)
    +
    +      // check if there is a bottom RelNode working under AccRetract
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        if (traitSetContainAccRetract(bottomRels(i).getTraitSet)) {
    +          bottomRelsUnderAccRetract = true
    +        }
    +        i += 1
    +      }
    +
    +      // Return true
    +      // if topRel needs to work under AccRetract by itself
    +      // or bottom relNodes make topRel to work under AccRetract
    +      needSet ||
    +        (bottomRelsUnderAccRetract
    +          && !canDigestRetraction(topRel)
    +          && !traitSetContainAccRetract(traitSet))
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new
    +      * topRel contains new bottomRels with AccMode properly setted. The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def accModeProcess(topRel: RelNode, bottomRels: ListBuffer[RelNode]): (RelNode, Boolean) = {
    +      var needTransform = false
    +      var i = 0
    +      var newTopRel = topRel
    +
    +      // process bottom RelNodes
    +      while (i < bottomRels.size) {
    --- End diff --
    
    please create a new list of bottom rels instead of changing the existing one:
    ```
    val transformedBottom = for (b <- bottomRels) yield {
    	if (needSetAccRetract(b)) {
    	  needTransform = true
    	  setAccRetract(b)
    	} else {
    	  b
    	}
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---