You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:40 UTC

[06/15] flink git commit: [FLINK-6090] [table] Add RetractionRules for annotating AccMode to DataStreamRel nodes.

[FLINK-6090] [table] Add RetractionRules for annotating AccMode to DataStreamRel nodes.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc54abc6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc54abc6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc54abc6

Branch: refs/heads/master
Commit: dc54abc694c3695270695be2e5bc59a7f91ee460
Parents: 8f78824
Author: hequn.chq <he...@alibaba-inc.com>
Authored: Fri Apr 7 13:12:04 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat May 6 01:51:54 2017 +0200

----------------------------------------------------------------------
 .../nodes/datastream/DataStreamCorrelate.scala  |   6 +-
 .../datastream/DataStreamGroupAggregate.scala   |   6 +
 .../DataStreamGroupWindowAggregate.scala        |   4 +
 .../datastream/DataStreamOverAggregate.scala    |   6 +-
 .../plan/nodes/datastream/DataStreamRel.scala   |  17 +
 .../nodes/datastream/retractionTraitDefs.scala  |  81 +++++
 .../nodes/datastream/retractionTraits.scala     | 100 ++++++
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   7 +-
 .../datastream/DataStreamRetractionRules.scala  | 248 ++++++++++++++
 .../flink/table/CalciteConfigBuilderTest.scala  |  39 +--
 .../table/plan/rules/RetractionRulesTest.scala  | 321 +++++++++++++++++++
 11 files changed, 810 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc54abc6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 8955110..899d8ef 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -37,14 +37,14 @@ class DataStreamCorrelate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputSchema: RowSchema,
-    inputNode: RelNode,
+    input: RelNode,
     scan: FlinkLogicalTableFunctionScan,
     condition: Option[RexNode],
     schema: RowSchema,
     joinSchema: RowSchema,
     joinType: SemiJoinType,
     ruleDescription: String)
-  extends SingleRel(cluster, traitSet, inputNode)
+  extends SingleRel(cluster, traitSet, input)
   with CommonCorrelate
   with DataStreamRel {
 
@@ -86,7 +86,7 @@ class DataStreamCorrelate(
     val config = tableEnv.getConfig
 
     // we do not need to specify input type
-    val inputDS = inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
     val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
     val rexCall = funcRel.getCall.asInstanceOf[RexCall]

http://git-wip-us.apache.org/repos/asf/flink/blob/dc54abc6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 19f90c7..3555c80 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -60,6 +60,12 @@ class DataStreamGroupAggregate(
 
   override def deriveRowType() = schema.logicalType
 
+  override def needsUpdatesAsRetraction = true
+
+  override def producesUpdates = true
+
+  override def consumesRetractions = true
+
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamGroupAggregate(
       cluster,

http://git-wip-us.apache.org/repos/asf/flink/blob/dc54abc6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 5aced66..ea4b0bf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -53,6 +53,10 @@ class DataStreamGroupWindowAggregate(
 
   override def deriveRowType(): RelDataType = schema.logicalType
 
+  override def needsUpdatesAsRetraction = true
+
+  override def consumesRetractions = true
+
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamGroupWindowAggregate(
       window,

http://git-wip-us.apache.org/repos/asf/flink/blob/dc54abc6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index db31f32..fb912c4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -49,6 +49,10 @@ class DataStreamOverAggregate(
 
   override def deriveRowType(): RelDataType = schema.logicalType
 
+  override def needsUpdatesAsRetraction = true
+
+  override def consumesRetractions = true
+
   override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
     new DataStreamOverAggregate(
       logicWindow,
@@ -297,7 +301,7 @@ class DataStreamOverAggregate(
     }ORDER BY: ${orderingToString(inputSchema.logicalType,
         overWindow.orderKeys.getFieldCollations)}, " +
       s"${if (overWindow.isRows) "ROWS" else "RANGE"}" +
-      s"${windowRange(logicWindow, overWindow, inputNode.asInstanceOf[DataStreamRel])}, " +
+      s"${windowRange(logicWindow, overWindow, inputNode)}, " +
       s"select: (${
         aggregationToString(
           inputSchema.logicalType,

http://git-wip-us.apache.org/repos/asf/flink/blob/dc54abc6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 03938f3..128da81 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -33,4 +33,21 @@ trait DataStreamRel extends FlinkRelNode {
     */
   def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row]
 
+  /**
+    * Whether the [[DataStreamRel]] requires that update and delete changes are sent with retraction
+    * messages.
+    */
+  def needsUpdatesAsRetraction: Boolean = false
+
+  /**
+    * Whether the [[DataStreamRel]] produces update and delete changes.
+    */
+  def producesUpdates: Boolean = false
+
+  /**
+    * Wheter the [[DataStreamRel]] consumes retraction messages instead of forwarding them.
+    * The node might or might not produce new retraction messages.
+    */
+  def consumesRetractions: Boolean = false
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dc54abc6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraitDefs.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraitDefs.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraitDefs.scala
new file mode 100644
index 0000000..c43d951
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraitDefs.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptPlanner, RelTraitDef}
+import org.apache.calcite.rel.RelNode
+
+/**
+  * Definition of the [[UpdateAsRetractionTrait]].
+  */
+class UpdateAsRetractionTraitDef extends RelTraitDef[UpdateAsRetractionTrait] {
+  override def convert(
+      planner: RelOptPlanner,
+      rel: RelNode,
+      toTrait: UpdateAsRetractionTrait,
+      allowInfiniteCostConverters: Boolean): RelNode = {
+
+    rel.copy(rel.getTraitSet.plus(toTrait), rel.getInputs)
+  }
+
+  override def canConvert(
+      planner: RelOptPlanner,
+      fromTrait: UpdateAsRetractionTrait,
+      toTrait: UpdateAsRetractionTrait): Boolean = true
+
+  override def getTraitClass: Class[UpdateAsRetractionTrait] = classOf[UpdateAsRetractionTrait]
+
+  override def getSimpleName: String = this.getClass.getSimpleName
+
+  override def getDefault: UpdateAsRetractionTrait = UpdateAsRetractionTrait.DEFAULT
+}
+
+object UpdateAsRetractionTraitDef {
+  val INSTANCE = new UpdateAsRetractionTraitDef
+}
+
+/**
+  * Definition of the [[AccModeTrait]].
+  */
+class AccModeTraitDef extends RelTraitDef[AccModeTrait] {
+
+  override def convert(
+      planner: RelOptPlanner,
+      rel: RelNode,
+      toTrait: AccModeTrait,
+      allowInfiniteCostConverters: Boolean): RelNode = {
+
+    rel.copy(rel.getTraitSet.plus(toTrait), rel.getInputs)
+  }
+
+  override def canConvert(
+      planner: RelOptPlanner,
+      fromTrait: AccModeTrait,
+      toTrait: AccModeTrait): Boolean = true
+
+  override def getTraitClass: Class[AccModeTrait] = classOf[AccModeTrait]
+
+  override def getSimpleName: String = this.getClass.getSimpleName
+
+  override def getDefault: AccModeTrait = AccModeTrait.DEFAULT
+}
+
+object AccModeTraitDef {
+  val INSTANCE = new AccModeTraitDef
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc54abc6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
new file mode 100644
index 0000000..c3b43ba
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef}
+import org.apache.flink.table.plan.nodes.datastream.AccMode.AccMode
+
+/** Tracks if a [[org.apache.calcite.rel.RelNode]] needs to send update and delete changes as
+  * retraction messages.
+  */
+class UpdateAsRetractionTrait extends RelTrait {
+
+  /**
+    * Defines whether the [[org.apache.calcite.rel.RelNode]] needs to send update and delete
+    * changes as retraction messages.
+    */
+  private var updateAsRetraction: Boolean = false
+
+  def this(updateAsRetraction: Boolean) {
+    this()
+    this.updateAsRetraction = updateAsRetraction
+  }
+
+  def sendsUpdatesAsRetractions: Boolean = updateAsRetraction
+
+  override def register(planner: RelOptPlanner): Unit = { }
+
+  override def getTraitDef: RelTraitDef[_ <: RelTrait] = UpdateAsRetractionTraitDef.INSTANCE
+
+  override def satisfies(`trait`: RelTrait): Boolean = this.equals(`trait`)
+
+  override def toString: String = updateAsRetraction.toString
+
+}
+
+object UpdateAsRetractionTrait {
+  val DEFAULT = new UpdateAsRetractionTrait(false)
+}
+
+/**
+  * Tracks the AccMode of a [[org.apache.calcite.rel.RelNode]].
+  */
+class AccModeTrait extends RelTrait {
+
+  /** Defines the accumulating mode for a operator. */
+  private var accMode = AccMode.Acc
+
+  def this(accMode: AccMode) {
+    this()
+    this.accMode = accMode
+  }
+
+  def getAccMode: AccMode = accMode
+
+  override def register(planner: RelOptPlanner): Unit = { }
+
+  override def getTraitDef: RelTraitDef[_ <: RelTrait] = AccModeTraitDef.INSTANCE
+
+  override def satisfies(`trait`: RelTrait): Boolean = this.equals(`trait`)
+
+  override def toString: String = accMode.toString
+}
+
+object AccModeTrait {
+  val DEFAULT = new AccModeTrait(AccMode.Acc)
+}
+
+/**
+  * The AccMode indicates which kinds of messages a [[org.apache.calcite.rel.RelNode]] might
+  * produce.
+  * In [[AccMode.Acc]] the node only emit accumulate messages.
+  * In [[AccMode.AccRetract]], the node produces accumulate messages for insert changes,
+  * retraction messages for delete changes, and accumulate and retraction messages
+  * for update changes.
+  */
+object AccMode extends Enumeration {
+  type AccMode = Value
+
+  val Acc        = Value // Operator produces only accumulate (insert) messages
+  val AccRetract = Value // Operator produces accumulate (insert, update) and
+                         //   retraction (delete, update) messages
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dc54abc6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index f4de651..fad60fd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -21,9 +21,9 @@ package org.apache.flink.table.plan.rules
 import org.apache.calcite.rel.rules._
 import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.table.plan.rules.common._
+import org.apache.flink.table.plan.rules.logical._
 import org.apache.flink.table.plan.rules.dataSet._
 import org.apache.flink.table.plan.rules.datastream._
-import org.apache.flink.table.plan.rules.logical._
 import org.apache.flink.table.plan.nodes.logical._
 
 object FlinkRuleSets {
@@ -191,7 +191,10 @@ object FlinkRuleSets {
     * RuleSet to decorate plans for stream / DataStream execution
     */
   val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList(
-    // rules
+    // retraction rules
+    DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE,
+    DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE,
+    DataStreamRetractionRules.ACCMODE_INSTANCE
   )
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dc54abc6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
new file mode 100644
index 0000000..aeb67b6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule.{operand, _}
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.table.plan.nodes.datastream._
+
+import scala.collection.JavaConverters._
+
+/**
+  * Collection of rules to annotate [[DataStreamRel]] nodes with retraction information.
+  *
+  * The rules have to be applied in the following order:
+  * - [[DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE]]
+  * - [[DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE]]
+  * - [[DataStreamRetractionRules.ACCMODE_INSTANCE]]
+  *
+  * The rules will assign a [[AccModeTrait]] to each [[DataStreamRel]] node of the plan. The
+  * trait defines the [[AccMode]] a node.
+  * - [[AccMode.Acc]] defines that the node produces only accumulate messages, i.e., all types of
+  * modifications (insert, update, delete) are encoded as accumulate messages.
+  * - [[AccMode.AccRetract]] defines that the node produces accumulate and retraction messages.
+  * Insert modifications are encoded as accumulate message, delete modifications as retraction
+  * message, and update modifications as a pair of accumulate and retraction messages.
+  *
+  */
+object DataStreamRetractionRules {
+
+  /**
+    * Rule instance that assigns default retraction to [[DataStreamRel]] nodes.
+    */
+  val DEFAULT_RETRACTION_INSTANCE = new AssignDefaultRetractionRule()
+
+  /**
+    * Rule instance that checks if [[DataStreamRel]] nodes need to ship updates as retractions.
+    */
+  val UPDATES_AS_RETRACTION_INSTANCE = new SetUpdatesAsRetractionRule()
+
+  /**
+    * Rule instance that assigns the [[AccMode]] to [[DataStreamRel]] nodes.
+    */
+  val ACCMODE_INSTANCE = new SetAccModeRule()
+
+  /**
+    * Get all children RelNodes of a RelNode.
+    *
+    * @param parent The parent RelNode
+    * @return All child nodes
+    */
+  def getChildRelNodes(parent: RelNode): Seq[RelNode] = {
+    parent.getInputs.asScala.map(_.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  /**
+    * Checks if a [[RelNode]] ships updates as retractions.
+    *
+    * @param node The node to check.
+    * @return True if the node ships updates as retractions, false otherwise.
+    */
+  def sendsUpdatesAsRetraction(node: RelNode): Boolean = {
+    val retractionTrait = node.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE)
+    retractionTrait != null && retractionTrait.sendsUpdatesAsRetractions
+  }
+
+  /**
+    * Rule that assigns the default retraction information to [[DataStreamRel]] nodes.
+    * The default is to not publish updates as retraction messages and [[AccMode.Acc]].
+    */
+  class AssignDefaultRetractionRule extends RelOptRule(
+    operand(
+      classOf[DataStreamRel], none()),
+    "AssignDefaultRetractionRule") {
+
+    override def onMatch(call: RelOptRuleCall): Unit = {
+      val rel = call.rel(0).asInstanceOf[DataStreamRel]
+      val traits = rel.getTraitSet
+
+      val traitsWithUpdateAsRetrac = if (!traits.contains(UpdateAsRetractionTraitDef.INSTANCE)) {
+        traits.plus(UpdateAsRetractionTrait.DEFAULT)
+      } else {
+        traits
+      }
+      val traitsWithAccMode =
+        if (!traitsWithUpdateAsRetrac.contains(AccModeTraitDef.INSTANCE)) {
+          traitsWithUpdateAsRetrac.plus(AccModeTrait.DEFAULT)
+      } else {
+        traitsWithUpdateAsRetrac
+      }
+
+      if (traits != traitsWithAccMode) {
+        call.transformTo(rel.copy(traitsWithAccMode, rel.getInputs))
+      }
+    }
+  }
+
+  /**
+    * Rule that annotates all [[DataStreamRel]] nodes that need to sent out update and delete
+    * changes as retraction messages.
+    */
+  class SetUpdatesAsRetractionRule extends RelOptRule(
+    operand(
+      classOf[DataStreamRel], none()),
+    "SetUpdatesAsRetractionRule") {
+
+    /**
+      * Checks if a [[RelNode]] requires that update and delete changes are sent with retraction
+      * messages.
+      */
+    def needsUpdatesAsRetraction(node: RelNode): Boolean = {
+      node match {
+        case _ if sendsUpdatesAsRetraction(node) => true
+        case dsr: DataStreamRel => dsr.needsUpdatesAsRetraction
+      }
+    }
+
+    /**
+      * Annotates a [[RelNode]] to send out update and delete changes as retraction messages.
+      */
+    def setUpdatesAsRetraction(relNode: RelNode): RelNode = {
+      val traitSet = relNode.getTraitSet
+      relNode.copy(traitSet.plus(new UpdateAsRetractionTrait(true)), relNode.getInputs)
+    }
+
+    /**
+      * Annotates the children of a parent node with the information that they need to forward
+      * update and delete modifications as retraction messages.
+      *
+      * A child needs to produce retraction messages, if
+      *
+      * 1. its parent requires retraction messages by itself because it is a certain type
+      *    of operator, such as a [[DataStreamGroupAggregate]] or [[DataStreamOverAggregate]], or
+      * 2. its parent requires retraction because its own parent requires retraction
+      *    (transitive requirement).
+      *
+      */
+    override def onMatch(call: RelOptRuleCall): Unit = {
+      val parent = call.rel(0).asInstanceOf[DataStreamRel]
+
+      val children = getChildRelNodes(parent)
+      // check if children need to sent out retraction messages
+      val newChildren = for (c <- children) yield {
+        if (needsUpdatesAsRetraction(parent) && !sendsUpdatesAsRetraction(c)) {
+          setUpdatesAsRetraction(c)
+        } else {
+          c
+        }
+      }
+
+      // update parent if a child was updated
+      if (children != newChildren) {
+        call.transformTo(parent.copy(parent.getTraitSet, newChildren.asJava))
+      }
+    }
+  }
+
+  /**
+    * Sets the [[AccMode]] of [[DataStreamRel]] nodes.
+    */
+  class SetAccModeRule extends RelOptRule(
+    operand(
+      classOf[DataStreamRel], none()),
+    "SetAccModeRule") {
+
+    /**
+      * Checks if a [[RelNode]] produces update and delete changes.
+      */
+    def producesUpdates(relNode: RelNode): Boolean = {
+      relNode match {
+        case dsr: DataStreamRel => dsr.producesUpdates
+      }
+    }
+
+    /**
+      * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode.
+      */
+    def isAccRetract(node: RelNode): Boolean = {
+      val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE)
+      null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract
+    }
+
+    /**
+      * Set [[AccMode.AccRetract]] to a [[RelNode]].
+      */
+    def setAccRetract(relNode: RelNode): RelNode = {
+      val traitSet = relNode.getTraitSet
+      relNode.copy(traitSet.plus(new AccModeTrait(AccMode.AccRetract)), relNode.getInputs)
+    }
+
+    /**
+      * Checks if a [[RelNode]] consumes retraction messages instead of forwarding them.
+      * The node might or might not produce new retraction messages.
+      * This is checked by [[producesRetractions()]].
+      */
+    def consumesRetractions(relNode: RelNode): Boolean = {
+      relNode match {
+        case dsr: DataStreamRel => dsr.consumesRetractions
+      }
+    }
+
+    /**
+      * Checks if a [[RelNode]] produces retraction messages.
+      */
+    def producesRetractions(node: RelNode): Boolean = {
+      sendsUpdatesAsRetraction(node) && producesUpdates(node)
+    }
+
+    /**
+      * Checks if a [[RelNode]] forwards retraction messages from its children.
+      */
+    def forwardsRetractions(parent: RelNode, children: Seq[RelNode]): Boolean = {
+      children.exists(c => isAccRetract(c)) && !consumesRetractions(parent)
+    }
+
+    /**
+      * Updates the [[AccMode]] of a [[RelNode]] and its children if necessary.
+      */
+    override def onMatch(call: RelOptRuleCall): Unit = {
+      val parent = call.rel(0).asInstanceOf[DataStreamRel]
+      val children = getChildRelNodes(parent)
+
+      // check if the AccMode of the parent needs to be updated
+      if (!isAccRetract(parent) &&
+          (producesRetractions(parent) || forwardsRetractions(parent, children))) {
+        call.transformTo(setAccRetract(parent))
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc54abc6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
index ed29f0f..0c337e0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
@@ -22,6 +22,7 @@ import org.apache.calcite.rel.rules._
 import org.apache.calcite.sql.fun.{OracleSqlOperatorTable, SqlStdOperatorTable}
 import org.apache.calcite.tools.RuleSets
 import org.apache.flink.table.calcite.{CalciteConfig, CalciteConfigBuilder}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.junit.Assert._
 import org.junit.Test
 
@@ -51,11 +52,11 @@ class CalciteConfigBuilderTest {
   def testRules(): Unit = {
 
     val cc: CalciteConfig = new CalciteConfigBuilder()
-        .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
-        .replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-        .replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-        .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
-        .build()
+      .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+      .replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+      .replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+      .replaceDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
+      .build()
 
     assertFalse(cc.replacesNormRuleSet)
     assertTrue(cc.getNormRuleSet.isDefined)
@@ -232,62 +233,62 @@ class CalciteConfigBuilderTest {
   def testReplaceDecorationRules(): Unit = {
 
     val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+      .replaceDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
       .build()
 
     assertEquals(true, cc.replacesDecoRuleSet)
     assertTrue(cc.getDecoRuleSet.isDefined)
     val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
     assertEquals(1, cSet.size)
-    assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+    assertTrue(cSet.contains(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
   }
 
   @Test
   def testReplaceDecorationAddRules(): Unit = {
 
     val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
-      .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE))
+      .replaceDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
+      .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE))
       .build()
 
     assertEquals(true, cc.replacesDecoRuleSet)
     assertTrue(cc.getDecoRuleSet.isDefined)
     val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
     assertEquals(2, cSet.size)
-    assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
-    assertTrue(cSet.contains(ReduceExpressionsRule.PROJECT_INSTANCE))
+    assertTrue(cSet.contains(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
+    assertTrue(cSet.contains(DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE))
   }
 
   @Test
   def testAddDecorationRules(): Unit = {
 
     val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+      .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
       .build()
 
     assertEquals(false, cc.replacesDecoRuleSet)
     assertTrue(cc.getDecoRuleSet.isDefined)
     val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
     assertEquals(1, cSet.size)
-    assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+    assertTrue(cSet.contains(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
   }
 
   @Test
   def testAddAddDecorationRules(): Unit = {
 
     val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
-      .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE,
-                                      ReduceExpressionsRule.CALC_INSTANCE))
+      .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
+      .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE,
+                                      DataStreamRetractionRules.ACCMODE_INSTANCE))
       .build()
 
     assertEquals(false, cc.replacesDecoRuleSet)
     assertTrue(cc.getDecoRuleSet.isDefined)
     val cList = cc.getDecoRuleSet.get.iterator().asScala.toList
     assertEquals(3, cList.size)
-    assertEquals(cList.head, ReduceExpressionsRule.FILTER_INSTANCE)
-    assertEquals(cList(1), ReduceExpressionsRule.PROJECT_INSTANCE)
-    assertEquals(cList(2), ReduceExpressionsRule.CALC_INSTANCE)
+    assertEquals(cList.head, DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE)
+    assertEquals(cList(1), DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE)
+    assertEquals(cList(2), DataStreamRetractionRules.ACCMODE_INSTANCE)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/dc54abc6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
new file mode 100644
index 0000000..580029f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules
+
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.plan.nodes.datastream._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Assert._
+import org.junit.{Ignore, Test}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+
+
+class RetractionRulesTest extends TableTestBase {
+
+  def streamTestForRetractionUtil(): StreamTableTestForRetractionUtil = {
+    new StreamTableTestForRetractionUtil()
+  }
+
+  @Test
+  def testSelect(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val table = util.addTable[(String, Int)]('word, 'number)
+
+    val resultTable = table.select('word, 'number)
+
+    val expected = s"DataStreamScan(false, Acc)"
+
+    util.verifyTableTrait(resultTable, expected)
+  }
+
+  // one level unbounded groupBy
+  @Test
+  def testGroupBy(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val table = util.addTable[(String, Int)]('word, 'number)
+    val defaultStatus = "false, Acc"
+
+    val resultTable = table
+      .groupBy('word)
+      .select('number.count)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupAggregate",
+          "DataStreamScan(true, Acc)",
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifyTableTrait(resultTable, expected)
+  }
+
+  // two level unbounded groupBy
+  @Test
+  def testTwoGroupBy(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val table = util.addTable[(String, Int)]('word, 'number)
+    val defaultStatus = "false, Acc"
+
+    val resultTable = table
+      .groupBy('word)
+      .select('word, 'number.count as 'count)
+      .groupBy('count)
+      .select('count, 'count.count as 'frequency)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            unaryNode(
+              "DataStreamGroupAggregate",
+              "DataStreamScan(true, Acc)",
+              "true, AccRetract"
+            ),
+            "true, AccRetract"
+          ),
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifyTableTrait(resultTable, expected)
+  }
+
+  // group window
+  @Test
+  def testGroupWindow(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val table = util.addTable[(String, Int)]('word, 'number, 'rowtime.rowtime)
+    val defaultStatus = "false, Acc"
+
+    val resultTable = table
+      .window(Tumble over 50.milli on 'rowtime as 'w)
+      .groupBy('w, 'word)
+      .select('word, 'number.count as 'count)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          "DataStreamScan(true, Acc)",
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifyTableTrait(resultTable, expected)
+  }
+
+  // group window after unbounded groupBy
+  @Test
+  @Ignore // cannot pass rowtime through non-windowed aggregation
+  def testGroupWindowAfterGroupBy(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val table = util.addTable[(String, Int)]('word, 'number, 'rowtime.rowtime)
+    val defaultStatus = "false, Acc"
+
+    val resultTable = table
+      .groupBy('word)
+      .select('word, 'number.count as 'count)
+      .window(Tumble over 50.milli on 'rowtime as 'w)
+      .groupBy('w, 'count)
+      .select('count, 'count.count as 'frequency)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            unaryNode(
+              "DataStreamGroupAggregate",
+              "DataStreamScan(true, Acc)",
+              "true, AccRetract"
+            ),
+            "true, AccRetract"
+          ),
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifyTableTrait(resultTable, expected)
+  }
+
+  // over window
+  @Test
+  def testOverWindow(): Unit = {
+    val util = streamTestForRetractionUtil()
+    util.addTable[(String, Int)]("T1", 'word, 'number, 'proctime.proctime)
+    val defaultStatus = "false, Acc"
+
+    val sqlQuery =
+      "SELECT " +
+        "word, count(number) " +
+        "OVER (PARTITION BY word ORDER BY proctime " +
+        "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+        "FROM T1"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          "DataStreamScan(true, Acc)",
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifySqlTrait(sqlQuery, expected)
+  }
+
+
+  // over window after unbounded groupBy
+  @Test
+  @Ignore // cannot pass rowtime through non-windowed aggregation
+  def testOverWindowAfterGroupBy(): Unit = {
+    val util = streamTestForRetractionUtil()
+    util.addTable[(String, Int)]("T1", 'word, 'number, 'proctime.proctime)
+    val defaultStatus = "false, Acc"
+
+    val sqlQuery =
+      "SELECT " +
+        "_count, count(word) " +
+        "OVER (PARTITION BY _count ORDER BY proctime " +
+        "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+        "FROM " +
+        "(SELECT word, count(number) as _count FROM T1 GROUP BY word) "
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            unaryNode(
+              "DataStreamGroupAggregate",
+              "DataStreamScan(true, Acc)",
+              "true, AccRetract"
+            ),
+            "true, AccRetract"
+          ),
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifySqlTrait(sqlQuery, expected)
+  }
+
+  // test binaryNode
+  @Test
+  def testBinaryNode(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val lTable = util.addTable[(String, Int)]('word, 'number)
+    val rTable = util.addTable[(String, Long)]('word_r, 'count_r)
+    val defaultStatus = "false, Acc"
+
+    val resultTable = lTable
+      .groupBy('word)
+      .select('word, 'number.count as 'count)
+      .unionAll(rTable)
+      .groupBy('count)
+      .select('count, 'count.count as 'frequency)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            binaryNode(
+              "DataStreamUnion",
+              unaryNode(
+                "DataStreamCalc",
+                unaryNode(
+                  "DataStreamGroupAggregate",
+                  "DataStreamScan(true, Acc)",
+                  "true, AccRetract"
+                ),
+                "true, AccRetract"
+              ),
+              "DataStreamScan(true, Acc)",
+              "true, AccRetract"
+            ),
+            "true, AccRetract"
+          ),
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifyTableTrait(resultTable, expected)
+  }
+}
+
+class StreamTableTestForRetractionUtil extends StreamTableTestUtil {
+
+  def verifySqlTrait(query: String, expected: String): Unit = {
+    verifyTableTrait(tEnv.sql(query), expected)
+  }
+
+  def verifyTableTrait(resultTable: Table, expected: String): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = tEnv.optimize(relNode)
+    val actual = TraitUtil.toString(optimized)
+    assertEquals(
+      expected.split("\n").map(_.trim).mkString("\n"),
+      actual.split("\n").map(_.trim).mkString("\n"))
+  }
+}
+
+object TraitUtil {
+  def toString(rel: RelNode): String = {
+    val className = rel.getClass.getSimpleName
+    var childString: String = ""
+    var i = 0
+    while (i < rel.getInputs.size()) {
+      childString += TraitUtil.toString(rel.getInput(i))
+      i += 1
+    }
+
+    val retractString = rel.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE).toString
+    val accModetString = rel.getTraitSet.getTrait(AccModeTraitDef.INSTANCE).toString
+
+    s"""$className($retractString, $accModetString)
+       |$childString
+       |""".stripMargin.stripLineEnd
+  }
+}
+