You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:49 UTC
[42/50] [abbrv] flink git commit: [FLINK-6090] [table] Add
RetractionRules for annotating AccMode to DataStreamRel nodes.
[FLINK-6090] [table] Add RetractionRules for annotating AccMode to DataStreamRel nodes.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/455a3c58
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/455a3c58
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/455a3c58
Branch: refs/heads/table-retraction
Commit: 455a3c589883253d52e0ba119ff615548cc20b67
Parents: 24fa1a1
Author: hequn.chq <he...@alibaba-inc.com>
Authored: Fri Apr 7 13:12:04 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed May 3 11:27:12 2017 +0200
----------------------------------------------------------------------
.../nodes/datastream/DataStreamCorrelate.scala | 6 +-
.../datastream/DataStreamGroupAggregate.scala | 6 +
.../DataStreamGroupWindowAggregate.scala | 4 +
.../datastream/DataStreamOverAggregate.scala | 4 +
.../plan/nodes/datastream/DataStreamRel.scala | 17 +
.../nodes/datastream/retractionTraitDefs.scala | 81 +++++
.../nodes/datastream/retractionTraits.scala | 100 ++++++
.../flink/table/plan/rules/FlinkRuleSets.scala | 7 +-
.../datastream/DataStreamRetractionRules.scala | 248 ++++++++++++++
.../flink/table/CalciteConfigBuilderTest.scala | 39 +--
.../table/plan/rules/RetractionRulesTest.scala | 330 +++++++++++++++++++
11 files changed, 818 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/455a3c58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 342920a..7680904 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -36,14 +36,14 @@ import org.apache.flink.types.Row
class DataStreamCorrelate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
- inputNode: RelNode,
+ input: RelNode,
scan: FlinkLogicalTableFunctionScan,
condition: Option[RexNode],
relRowType: RelDataType,
joinRowType: RelDataType,
joinType: SemiJoinType,
ruleDescription: String)
- extends SingleRel(cluster, traitSet, inputNode)
+ extends SingleRel(cluster, traitSet, input)
with CommonCorrelate
with DataStreamRel {
@@ -84,7 +84,7 @@ class DataStreamCorrelate(
val config = tableEnv.getConfig
// we do not need to specify input type
- val inputDS = inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
val rexCall = funcRel.getCall.asInstanceOf[RexCall]
http://git-wip-us.apache.org/repos/asf/flink/blob/455a3c58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 955d702..d88c72b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -58,6 +58,12 @@ class DataStreamGroupAggregate(
override def deriveRowType() = rowRelDataType
+ override def needsUpdatesAsRetraction = true
+
+ override def producesUpdates = true
+
+ override def consumesRetractions = true
+
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamGroupAggregate(
cluster,
http://git-wip-us.apache.org/repos/asf/flink/blob/455a3c58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 752dbbe..8959b23 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -55,6 +55,10 @@ class DataStreamGroupWindowAggregate(
override def deriveRowType(): RelDataType = rowRelDataType
+ override def needsUpdatesAsRetraction = true
+
+ override def consumesRetractions = true
+
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamGroupWindowAggregate(
window,
http://git-wip-us.apache.org/repos/asf/flink/blob/455a3c58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 2224752..031d533 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -51,6 +51,10 @@ class DataStreamOverAggregate(
override def deriveRowType(): RelDataType = rowRelDataType
+ override def needsUpdatesAsRetraction = true
+
+ override def consumesRetractions = true
+
override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
new DataStreamOverAggregate(
logicWindow,
http://git-wip-us.apache.org/repos/asf/flink/blob/455a3c58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index ae172a5..50d1d06 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -33,5 +33,22 @@ trait DataStreamRel extends FlinkRelNode {
*/
def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row]
+ /**
+ * Whether the [[DataStreamRel]] requires that update and delete changes are sent with retraction
+ * messages.
+ */
+ def needsUpdatesAsRetraction: Boolean = false
+
+ /**
+ * Whether the [[DataStreamRel]] produces update and delete changes.
+ */
+ def producesUpdates: Boolean = false
+
+ /**
+ * Wheter the [[DataStreamRel]] consumes retraction messages instead of forwarding them.
+ * The node might or might not produce new retraction messages.
+ */
+ def consumesRetractions: Boolean = false
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/455a3c58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraitDefs.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraitDefs.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraitDefs.scala
new file mode 100644
index 0000000..c43d951
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraitDefs.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptPlanner, RelTraitDef}
+import org.apache.calcite.rel.RelNode
+
+/**
+ * Definition of the [[UpdateAsRetractionTrait]].
+ */
+class UpdateAsRetractionTraitDef extends RelTraitDef[UpdateAsRetractionTrait] {
+ override def convert(
+ planner: RelOptPlanner,
+ rel: RelNode,
+ toTrait: UpdateAsRetractionTrait,
+ allowInfiniteCostConverters: Boolean): RelNode = {
+
+ rel.copy(rel.getTraitSet.plus(toTrait), rel.getInputs)
+ }
+
+ override def canConvert(
+ planner: RelOptPlanner,
+ fromTrait: UpdateAsRetractionTrait,
+ toTrait: UpdateAsRetractionTrait): Boolean = true
+
+ override def getTraitClass: Class[UpdateAsRetractionTrait] = classOf[UpdateAsRetractionTrait]
+
+ override def getSimpleName: String = this.getClass.getSimpleName
+
+ override def getDefault: UpdateAsRetractionTrait = UpdateAsRetractionTrait.DEFAULT
+}
+
+object UpdateAsRetractionTraitDef {
+ val INSTANCE = new UpdateAsRetractionTraitDef
+}
+
+/**
+ * Definition of the [[AccModeTrait]].
+ */
+class AccModeTraitDef extends RelTraitDef[AccModeTrait] {
+
+ override def convert(
+ planner: RelOptPlanner,
+ rel: RelNode,
+ toTrait: AccModeTrait,
+ allowInfiniteCostConverters: Boolean): RelNode = {
+
+ rel.copy(rel.getTraitSet.plus(toTrait), rel.getInputs)
+ }
+
+ override def canConvert(
+ planner: RelOptPlanner,
+ fromTrait: AccModeTrait,
+ toTrait: AccModeTrait): Boolean = true
+
+ override def getTraitClass: Class[AccModeTrait] = classOf[AccModeTrait]
+
+ override def getSimpleName: String = this.getClass.getSimpleName
+
+ override def getDefault: AccModeTrait = AccModeTrait.DEFAULT
+}
+
+object AccModeTraitDef {
+ val INSTANCE = new AccModeTraitDef
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/455a3c58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
new file mode 100644
index 0000000..c3b43ba
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef}
+import org.apache.flink.table.plan.nodes.datastream.AccMode.AccMode
+
+/** Tracks if a [[org.apache.calcite.rel.RelNode]] needs to send update and delete changes as
+ * retraction messages.
+ */
+class UpdateAsRetractionTrait extends RelTrait {
+
+ /**
+ * Defines whether the [[org.apache.calcite.rel.RelNode]] needs to send update and delete
+ * changes as retraction messages.
+ */
+ private var updateAsRetraction: Boolean = false
+
+ def this(updateAsRetraction: Boolean) {
+ this()
+ this.updateAsRetraction = updateAsRetraction
+ }
+
+ def sendsUpdatesAsRetractions: Boolean = updateAsRetraction
+
+ override def register(planner: RelOptPlanner): Unit = { }
+
+ override def getTraitDef: RelTraitDef[_ <: RelTrait] = UpdateAsRetractionTraitDef.INSTANCE
+
+ override def satisfies(`trait`: RelTrait): Boolean = this.equals(`trait`)
+
+ override def toString: String = updateAsRetraction.toString
+
+}
+
+object UpdateAsRetractionTrait {
+ val DEFAULT = new UpdateAsRetractionTrait(false)
+}
+
+/**
+ * Tracks the AccMode of a [[org.apache.calcite.rel.RelNode]].
+ */
+class AccModeTrait extends RelTrait {
+
+ /** Defines the accumulating mode for a operator. */
+ private var accMode = AccMode.Acc
+
+ def this(accMode: AccMode) {
+ this()
+ this.accMode = accMode
+ }
+
+ def getAccMode: AccMode = accMode
+
+ override def register(planner: RelOptPlanner): Unit = { }
+
+ override def getTraitDef: RelTraitDef[_ <: RelTrait] = AccModeTraitDef.INSTANCE
+
+ override def satisfies(`trait`: RelTrait): Boolean = this.equals(`trait`)
+
+ override def toString: String = accMode.toString
+}
+
+object AccModeTrait {
+ val DEFAULT = new AccModeTrait(AccMode.Acc)
+}
+
+/**
+ * The AccMode indicates which kinds of messages a [[org.apache.calcite.rel.RelNode]] might
+ * produce.
+ * In [[AccMode.Acc]] the node only emit accumulate messages.
+ * In [[AccMode.AccRetract]], the node produces accumulate messages for insert changes,
+ * retraction messages for delete changes, and accumulate and retraction messages
+ * for update changes.
+ */
+object AccMode extends Enumeration {
+ type AccMode = Value
+
+ val Acc = Value // Operator produces only accumulate (insert) messages
+ val AccRetract = Value // Operator produces accumulate (insert, update) and
+ // retraction (delete, update) messages
+}
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/455a3c58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index c16b469..ab3db56 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -21,9 +21,9 @@ package org.apache.flink.table.plan.rules
import org.apache.calcite.rel.rules._
import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.table.plan.rules.common._
+import org.apache.flink.table.plan.rules.logical._
import org.apache.flink.table.plan.rules.dataSet._
import org.apache.flink.table.plan.rules.datastream._
-import org.apache.flink.table.plan.rules.logical._
import org.apache.flink.table.plan.nodes.logical._
object FlinkRuleSets {
@@ -188,7 +188,10 @@ object FlinkRuleSets {
* RuleSet to decorate plans for stream / DataStream execution
*/
val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList(
- // rules
+ // retraction rules
+ DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE,
+ DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE,
+ DataStreamRetractionRules.ACCMODE_INSTANCE
)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/455a3c58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
new file mode 100644
index 0000000..aeb67b6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule.{operand, _}
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.table.plan.nodes.datastream._
+
+import scala.collection.JavaConverters._
+
+/**
+ * Collection of rules to annotate [[DataStreamRel]] nodes with retraction information.
+ *
+ * The rules have to be applied in the following order:
+ * - [[DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE]]
+ * - [[DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE]]
+ * - [[DataStreamRetractionRules.ACCMODE_INSTANCE]]
+ *
+ * The rules will assign a [[AccModeTrait]] to each [[DataStreamRel]] node of the plan. The
+ * trait defines the [[AccMode]] a node.
+ * - [[AccMode.Acc]] defines that the node produces only accumulate messages, i.e., all types of
+ * modifications (insert, update, delete) are encoded as accumulate messages.
+ * - [[AccMode.AccRetract]] defines that the node produces accumulate and retraction messages.
+ * Insert modifications are encoded as accumulate message, delete modifications as retraction
+ * message, and update modifications as a pair of accumulate and retraction messages.
+ *
+ */
+object DataStreamRetractionRules {
+
+ /**
+ * Rule instance that assigns default retraction to [[DataStreamRel]] nodes.
+ */
+ val DEFAULT_RETRACTION_INSTANCE = new AssignDefaultRetractionRule()
+
+ /**
+ * Rule instance that checks if [[DataStreamRel]] nodes need to ship updates as retractions.
+ */
+ val UPDATES_AS_RETRACTION_INSTANCE = new SetUpdatesAsRetractionRule()
+
+ /**
+ * Rule instance that assigns the [[AccMode]] to [[DataStreamRel]] nodes.
+ */
+ val ACCMODE_INSTANCE = new SetAccModeRule()
+
+ /**
+ * Get all children RelNodes of a RelNode.
+ *
+ * @param parent The parent RelNode
+ * @return All child nodes
+ */
+ def getChildRelNodes(parent: RelNode): Seq[RelNode] = {
+ parent.getInputs.asScala.map(_.asInstanceOf[HepRelVertex].getCurrentRel)
+ }
+
+ /**
+ * Checks if a [[RelNode]] ships updates as retractions.
+ *
+ * @param node The node to check.
+ * @return True if the node ships updates as retractions, false otherwise.
+ */
+ def sendsUpdatesAsRetraction(node: RelNode): Boolean = {
+ val retractionTrait = node.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE)
+ retractionTrait != null && retractionTrait.sendsUpdatesAsRetractions
+ }
+
+ /**
+ * Rule that assigns the default retraction information to [[DataStreamRel]] nodes.
+ * The default is to not publish updates as retraction messages and [[AccMode.Acc]].
+ */
+ class AssignDefaultRetractionRule extends RelOptRule(
+ operand(
+ classOf[DataStreamRel], none()),
+ "AssignDefaultRetractionRule") {
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val rel = call.rel(0).asInstanceOf[DataStreamRel]
+ val traits = rel.getTraitSet
+
+ val traitsWithUpdateAsRetrac = if (!traits.contains(UpdateAsRetractionTraitDef.INSTANCE)) {
+ traits.plus(UpdateAsRetractionTrait.DEFAULT)
+ } else {
+ traits
+ }
+ val traitsWithAccMode =
+ if (!traitsWithUpdateAsRetrac.contains(AccModeTraitDef.INSTANCE)) {
+ traitsWithUpdateAsRetrac.plus(AccModeTrait.DEFAULT)
+ } else {
+ traitsWithUpdateAsRetrac
+ }
+
+ if (traits != traitsWithAccMode) {
+ call.transformTo(rel.copy(traitsWithAccMode, rel.getInputs))
+ }
+ }
+ }
+
+ /**
+ * Rule that annotates all [[DataStreamRel]] nodes that need to sent out update and delete
+ * changes as retraction messages.
+ */
+ class SetUpdatesAsRetractionRule extends RelOptRule(
+ operand(
+ classOf[DataStreamRel], none()),
+ "SetUpdatesAsRetractionRule") {
+
+ /**
+ * Checks if a [[RelNode]] requires that update and delete changes are sent with retraction
+ * messages.
+ */
+ def needsUpdatesAsRetraction(node: RelNode): Boolean = {
+ node match {
+ case _ if sendsUpdatesAsRetraction(node) => true
+ case dsr: DataStreamRel => dsr.needsUpdatesAsRetraction
+ }
+ }
+
+ /**
+ * Annotates a [[RelNode]] to send out update and delete changes as retraction messages.
+ */
+ def setUpdatesAsRetraction(relNode: RelNode): RelNode = {
+ val traitSet = relNode.getTraitSet
+ relNode.copy(traitSet.plus(new UpdateAsRetractionTrait(true)), relNode.getInputs)
+ }
+
+ /**
+ * Annotates the children of a parent node with the information that they need to forward
+ * update and delete modifications as retraction messages.
+ *
+ * A child needs to produce retraction messages, if
+ *
+ * 1. its parent requires retraction messages by itself because it is a certain type
+ * of operator, such as a [[DataStreamGroupAggregate]] or [[DataStreamOverAggregate]], or
+ * 2. its parent requires retraction because its own parent requires retraction
+ * (transitive requirement).
+ *
+ */
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val parent = call.rel(0).asInstanceOf[DataStreamRel]
+
+ val children = getChildRelNodes(parent)
+ // check if children need to sent out retraction messages
+ val newChildren = for (c <- children) yield {
+ if (needsUpdatesAsRetraction(parent) && !sendsUpdatesAsRetraction(c)) {
+ setUpdatesAsRetraction(c)
+ } else {
+ c
+ }
+ }
+
+ // update parent if a child was updated
+ if (children != newChildren) {
+ call.transformTo(parent.copy(parent.getTraitSet, newChildren.asJava))
+ }
+ }
+ }
+
+ /**
+ * Sets the [[AccMode]] of [[DataStreamRel]] nodes.
+ */
+ class SetAccModeRule extends RelOptRule(
+ operand(
+ classOf[DataStreamRel], none()),
+ "SetAccModeRule") {
+
+ /**
+ * Checks if a [[RelNode]] produces update and delete changes.
+ */
+ def producesUpdates(relNode: RelNode): Boolean = {
+ relNode match {
+ case dsr: DataStreamRel => dsr.producesUpdates
+ }
+ }
+
+ /**
+ * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode.
+ */
+ def isAccRetract(node: RelNode): Boolean = {
+ val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE)
+ null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract
+ }
+
+ /**
+ * Set [[AccMode.AccRetract]] to a [[RelNode]].
+ */
+ def setAccRetract(relNode: RelNode): RelNode = {
+ val traitSet = relNode.getTraitSet
+ relNode.copy(traitSet.plus(new AccModeTrait(AccMode.AccRetract)), relNode.getInputs)
+ }
+
+ /**
+ * Checks if a [[RelNode]] consumes retraction messages instead of forwarding them.
+ * The node might or might not produce new retraction messages.
+ * This is checked by [[producesRetractions()]].
+ */
+ def consumesRetractions(relNode: RelNode): Boolean = {
+ relNode match {
+ case dsr: DataStreamRel => dsr.consumesRetractions
+ }
+ }
+
+ /**
+ * Checks if a [[RelNode]] produces retraction messages.
+ */
+ def producesRetractions(node: RelNode): Boolean = {
+ sendsUpdatesAsRetraction(node) && producesUpdates(node)
+ }
+
+ /**
+ * Checks if a [[RelNode]] forwards retraction messages from its children.
+ */
+ def forwardsRetractions(parent: RelNode, children: Seq[RelNode]): Boolean = {
+ children.exists(c => isAccRetract(c)) && !consumesRetractions(parent)
+ }
+
+ /**
+ * Updates the [[AccMode]] of a [[RelNode]] and its children if necessary.
+ */
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val parent = call.rel(0).asInstanceOf[DataStreamRel]
+ val children = getChildRelNodes(parent)
+
+ // check if the AccMode of the parent needs to be updated
+ if (!isAccRetract(parent) &&
+ (producesRetractions(parent) || forwardsRetractions(parent, children))) {
+ call.transformTo(setAccRetract(parent))
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/455a3c58/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
index ed29f0f..0c337e0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
@@ -22,6 +22,7 @@ import org.apache.calcite.rel.rules._
import org.apache.calcite.sql.fun.{OracleSqlOperatorTable, SqlStdOperatorTable}
import org.apache.calcite.tools.RuleSets
import org.apache.flink.table.calcite.{CalciteConfig, CalciteConfigBuilder}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
import org.junit.Assert._
import org.junit.Test
@@ -51,11 +52,11 @@ class CalciteConfigBuilderTest {
def testRules(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()
- .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
- .replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
- .replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
- .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
- .build()
+ .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+ .replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+ .replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+ .replaceDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
+ .build()
assertFalse(cc.replacesNormRuleSet)
assertTrue(cc.getNormRuleSet.isDefined)
@@ -232,62 +233,62 @@ class CalciteConfigBuilderTest {
def testReplaceDecorationRules(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()
- .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+ .replaceDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
.build()
assertEquals(true, cc.replacesDecoRuleSet)
assertTrue(cc.getDecoRuleSet.isDefined)
val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
assertEquals(1, cSet.size)
- assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+ assertTrue(cSet.contains(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
}
@Test
def testReplaceDecorationAddRules(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()
- .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
- .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE))
+ .replaceDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
+ .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE))
.build()
assertEquals(true, cc.replacesDecoRuleSet)
assertTrue(cc.getDecoRuleSet.isDefined)
val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
assertEquals(2, cSet.size)
- assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
- assertTrue(cSet.contains(ReduceExpressionsRule.PROJECT_INSTANCE))
+ assertTrue(cSet.contains(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
+ assertTrue(cSet.contains(DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE))
}
@Test
def testAddDecorationRules(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()
- .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+ .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
.build()
assertEquals(false, cc.replacesDecoRuleSet)
assertTrue(cc.getDecoRuleSet.isDefined)
val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
assertEquals(1, cSet.size)
- assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+ assertTrue(cSet.contains(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
}
@Test
def testAddAddDecorationRules(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()
- .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
- .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE,
- ReduceExpressionsRule.CALC_INSTANCE))
+ .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
+ .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE,
+ DataStreamRetractionRules.ACCMODE_INSTANCE))
.build()
assertEquals(false, cc.replacesDecoRuleSet)
assertTrue(cc.getDecoRuleSet.isDefined)
val cList = cc.getDecoRuleSet.get.iterator().asScala.toList
assertEquals(3, cList.size)
- assertEquals(cList.head, ReduceExpressionsRule.FILTER_INSTANCE)
- assertEquals(cList(1), ReduceExpressionsRule.PROJECT_INSTANCE)
- assertEquals(cList(2), ReduceExpressionsRule.CALC_INSTANCE)
+ assertEquals(cList.head, DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE)
+ assertEquals(cList(1), DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE)
+ assertEquals(cList(2), DataStreamRetractionRules.ACCMODE_INSTANCE)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/455a3c58/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
new file mode 100644
index 0000000..9eadbcb
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules
+
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.plan.nodes.datastream._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+
+
+class RetractionRulesTest extends TableTestBase {
+
+ def streamTestForRetractionUtil(): StreamTableTestForRetractionUtil = {
+ new StreamTableTestForRetractionUtil()
+ }
+
+ @Test
+ def testSelect(): Unit = {
+ val util = streamTestForRetractionUtil()
+ val table = util.addTable[(String, Int)]('word, 'number)
+
+ val resultTable = table.select('word, 'number)
+
+ val expected = s"DataStreamScan(false, Acc)"
+
+ util.verifyTableTrait(resultTable, expected)
+ }
+
+ // one level unbounded groupBy
+ @Test
+ def testGroupBy(): Unit = {
+ val util = streamTestForRetractionUtil()
+ val table = util.addTable[(String, Int)]('word, 'number)
+ val defaultStatus = "false, Acc"
+
+ val resultTable = table
+ .groupBy('word)
+ .select('number.count)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ "DataStreamScan(true, Acc)",
+ s"$defaultStatus"
+ ),
+ s"$defaultStatus"
+ )
+
+ util.verifyTableTrait(resultTable, expected)
+ }
+
+
+ // two level unbounded groupBy
+ @Test
+ def testTwoGroupBy(): Unit = {
+ val util = streamTestForRetractionUtil()
+ val table = util.addTable[(String, Int)]('word, 'number)
+ val defaultStatus = "false, Acc"
+
+ val resultTable = table
+ .groupBy('word)
+ .select('word, 'number.count as 'count)
+ .groupBy('count)
+ .select('count, 'count.count as 'frequency)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ "DataStreamScan(true, Acc)",
+ "true, AccRetract"
+ ),
+ "true, AccRetract"
+ ),
+ s"$defaultStatus"
+ ),
+ s"$defaultStatus"
+ )
+
+ util.verifyTableTrait(resultTable, expected)
+ }
+
+
+ // group window
+ @Test
+ def testGroupWindow(): Unit = {
+ val util = streamTestForRetractionUtil()
+ val table = util.addTable[(String, Int)]('word, 'number)
+ val defaultStatus = "false, Acc"
+
+ val resultTable = table
+ .window(Tumble over 50.milli as 'w)
+ .groupBy('w, 'word)
+ .select('word, 'number.count as 'count)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ "DataStreamScan(true, Acc)",
+ s"$defaultStatus"
+ ),
+ s"$defaultStatus"
+ )
+
+ util.verifyTableTrait(resultTable, expected)
+ }
+
+
+
+ // group window after unbounded groupBy
+ @Test
+ def testGroupWindowAfterGroupBy(): Unit = {
+ val util = streamTestForRetractionUtil()
+ val table = util.addTable[(String, Int)]('word, 'number)
+ val defaultStatus = "false, Acc"
+
+ val resultTable = table
+ .groupBy('word)
+ .select('word, 'number.count as 'count)
+ .window(Tumble over 50.milli as 'w)
+ .groupBy('w, 'count)
+ .select('count, 'count.count as 'frequency)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ "DataStreamScan(true, Acc)",
+ "true, AccRetract"
+ ),
+ "true, AccRetract"
+ ),
+ s"$defaultStatus"
+ ),
+ s"$defaultStatus"
+ )
+
+ util.verifyTableTrait(resultTable, expected)
+ }
+
+
+ // over window
+ @Test
+ def testOverWindow(): Unit = {
+ val util = streamTestForRetractionUtil()
+ util.addTable[(String, Int)]("T1", 'word, 'number)
+ val defaultStatus = "false, Acc"
+
+ val sqlQuery =
+ "SELECT " +
+ "word, count(number) " +
+ "OVER (PARTITION BY word ORDER BY ProcTime() " +
+ "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+ "FROM T1"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ "DataStreamScan(true, Acc)",
+ "true, Acc"
+ ),
+ s"$defaultStatus"
+ ),
+ s"$defaultStatus"
+ )
+
+ util.verifySqlTrait(sqlQuery, expected)
+ }
+
+
+ // over window after unbounded groupBy
+ @Test
+ def testOverWindowAfterGroupBy(): Unit = {
+ val util = streamTestForRetractionUtil()
+ util.addTable[(String, Int)]("T1", 'word, 'number)
+ val defaultStatus = "false, Acc"
+
+ val sqlQuery =
+ "SELECT " +
+ "_count, count(word) " +
+ "OVER (PARTITION BY _count ORDER BY ProcTime() " +
+ "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+ "FROM " +
+ "(SELECT word, count(number) as _count FROM T1 GROUP BY word) "
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ "DataStreamScan(true, Acc)",
+ "true, AccRetract"
+ ),
+ "true, AccRetract"
+ ),
+ s"$defaultStatus"
+ ),
+ s"$defaultStatus"
+ )
+
+ util.verifySqlTrait(sqlQuery, expected)
+ }
+
+ // test binaryNode
+ @Test
+ def testBinaryNode(): Unit = {
+ val util = streamTestForRetractionUtil()
+ val lTable = util.addTable[(String, Int)]('word, 'number)
+ val rTable = util.addTable[(String, Long)]('word_r, 'count_r)
+ val defaultStatus = "false, Acc"
+
+ val resultTable = lTable
+ .groupBy('word)
+ .select('word, 'number.count as 'count)
+ .unionAll(rTable)
+ .groupBy('count)
+ .select('count, 'count.count as 'frequency)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ "DataStreamScan(true, Acc)",
+ "true, AccRetract"
+ ),
+ "true, AccRetract"
+ ),
+ "DataStreamScan(true, Acc)",
+ "true, AccRetract"
+ ),
+ "true, AccRetract"
+ ),
+ s"$defaultStatus"
+ ),
+ s"$defaultStatus"
+ )
+
+ util.verifyTableTrait(resultTable, expected)
+ }
+}
+
+
+class StreamTableTestForRetractionUtil extends StreamTableTestUtil {
+
+ def verifySqlTrait(query: String, expected: String): Unit = {
+ verifyTableTrait(tEnv.sql(query), expected)
+ }
+
+ def verifyTableTrait(resultTable: Table, expected: String): Unit = {
+ val relNode = resultTable.getRelNode
+ val optimized = tEnv.optimize(relNode)
+ val actual = TraitUtil.toString(optimized)
+ assertEquals(
+ expected.split("\n").map(_.trim).mkString("\n"),
+ actual.split("\n").map(_.trim).mkString("\n"))
+ }
+}
+
+
+object TraitUtil {
+ def toString(rel: RelNode): String = {
+ val className = rel.getClass.getSimpleName
+ var childString: String = ""
+ var i = 0
+ while (i < rel.getInputs.size()) {
+ childString += TraitUtil.toString(rel.getInput(i))
+ i += 1
+ }
+
+ val retractString = rel.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE).toString
+ val accModetString = rel.getTraitSet.getTrait(AccModeTraitDef.INSTANCE).toString
+
+ s"""$className($retractString, $accModetString)
+ |$childString
+ |""".stripMargin.stripLineEnd
+ }
+}
+