You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/03/08 13:15:33 UTC
flink git commit: [FLINK-5722] [table] Add dedicated DataSetDistinct
operator.
Repository: flink
Updated Branches:
refs/heads/master 338c30a41 -> 7a629fc59
[FLINK-5722] [table] Add dedicated DataSetDistinct operator.
- Uses hash-combiner for more better combine rate.
This closes #3471.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a629fc5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a629fc5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a629fc5
Branch: refs/heads/master
Commit: 7a629fc59ff206ba51f22e1bf35fe50882e63538
Parents: 338c30a
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Mar 3 23:17:36 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Mar 8 14:14:06 2017 +0100
----------------------------------------------------------------------
.../plan/nodes/dataset/DataSetDistinct.scala | 94 ++++++++++++++++++++
.../flink/table/plan/rules/FlinkRuleSets.scala | 1 +
.../rules/dataSet/DataSetAggregateRule.scala | 9 +-
.../rules/dataSet/DataSetDistinctRule.scala | 61 +++++++++++++
.../runtime/aggregate/DistinctReduce.scala | 26 ++++++
.../scala/batch/sql/DistinctAggregateTest.scala | 45 ++++------
.../batch/sql/QueryDecorrelationTest.scala | 10 +--
.../api/scala/batch/sql/SetOperatorsTest.scala | 5 +-
.../scala/batch/table/FieldProjectionTest.scala | 10 +--
9 files changed, 218 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala
new file mode 100644
index 0000000..14116f1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.runtime.aggregate.DistinctReduce
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+ * DataSet RelNode for a Distinct (LogicalAggregate without aggregation functions).
+ *
+ */
+class DataSetDistinct(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ input: RelNode,
+ rowRelDataType: RelDataType,
+ ruleDescription: String)
+ extends SingleRel(cluster, traitSet, input)
+ with DataSetRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetDistinct(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ rowRelDataType,
+ ruleDescription
+ )
+ }
+
+ override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+
+ val child = this.getInput
+ val rowCnt = metadata.getRowCount(child)
+ val rowSize = this.estimateRowSize(child.getRowType)
+ // less expensive than DataSetAggregate without aggregates
+ planner.getCostFactory.makeCost(rowCnt, 0, rowCnt * rowSize * 0.9)
+ }
+
+ override def toString: String = {
+ s"Distinct(distinct: (${rowTypeToString(rowRelDataType)}))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw).item("distinct", rowTypeToString(rowRelDataType))
+ }
+
+ def rowTypeToString(rowType: RelDataType): String = {
+ rowType.getFieldList.asScala.map(_.getName).mkString(", ")
+ }
+
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
+
+ val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val groupKeys = (0 until rowRelDataType.getFieldCount).toArray // group on all fields
+
+ inputDS
+ .groupBy(groupKeys: _*)
+ .reduce(new DistinctReduce)
+ .setCombineHint(CombineHint.HASH) // use hash-combiner
+ .name("distinct")
+ .returns(inputDS.getType)
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 39e4353..3b20236 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -110,6 +110,7 @@ object FlinkRuleSets {
DataSetWindowAggregateRule.INSTANCE,
DataSetAggregateRule.INSTANCE,
DataSetAggregateWithNullValuesRule.INSTANCE,
+ DataSetDistinctRule.INSTANCE,
DataSetCalcRule.INSTANCE,
DataSetJoinRule.INSTANCE,
DataSetSingleRowJoinRule.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
index 9c0acdd..98d1c13 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
@@ -22,7 +22,6 @@ import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTrait
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalAggregate
-import org.apache.flink.table.api.TableException
import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention, DataSetUnion}
import scala.collection.JavaConversions._
@@ -42,6 +41,14 @@ class DataSetAggregateRule
return false
}
+ // distinct is translated into dedicated operator
+ if (agg.getAggCallList.isEmpty &&
+ agg.getGroupCount == agg.getRowType.getFieldCount &&
+ agg.getRowType.equals(agg.getInput.getRowType) &&
+ agg.getGroupSets.size() == 1) {
+ return false
+ }
+
// check if we have distinct aggregates
val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetDistinctRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetDistinctRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetDistinctRule.scala
new file mode 100644
index 0000000..644ff9b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetDistinctRule.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetDistinct}
+
+class DataSetDistinctRule
+ extends ConverterRule(
+ classOf[LogicalAggregate],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetDistinctRule")
+ {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+
+ // only accept distinct
+ agg.getAggCallList.isEmpty &&
+ agg.getGroupCount == agg.getRowType.getFieldCount &&
+ agg.getRowType.equals(agg.getInput.getRowType) &&
+ agg.getGroupSets.size() == 1
+ }
+
+ def convert(rel: RelNode): RelNode = {
+ val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+ val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
+
+ new DataSetDistinct(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ agg.getRowType,
+ description)
+ }
+ }
+
+object DataSetDistinctRule {
+ val INSTANCE: RelOptRule = new DataSetDistinctRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DistinctReduce.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DistinctReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DistinctReduce.scala
new file mode 100644
index 0000000..f440573
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DistinctReduce.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.types.Row
+
+class DistinctReduce extends ReduceFunction[Row] {
+ override def reduce(value1: Row, value2: Row): Row = value1
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
index 38e4ea8..54b4d24 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
@@ -40,14 +40,13 @@ class DistinctAggregateTest extends TableTestBase {
unaryNode(
"DataSetValues",
unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "a")
),
- term("groupBy", "a"),
- term("select", "a")
+ term("distinct", "a")
),
tuples(List(null)),
term("values", "a")
@@ -74,14 +73,13 @@ class DistinctAggregateTest extends TableTestBase {
unaryNode(
"DataSetValues",
unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "a")
),
- term("groupBy", "a"),
- term("select", "a")
+ term("distinct", "a")
),
tuples(List(null)),
term("values", "a")
@@ -174,14 +172,13 @@ class DistinctAggregateTest extends TableTestBase {
unaryNode(
"DataSetValues",
unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "a")
),
- term("groupBy", "a"),
- term("select", "a")
+ term("distinct", "a")
),
tuples(List(null)),
term("values", "a")
@@ -197,14 +194,13 @@ class DistinctAggregateTest extends TableTestBase {
unaryNode(
"DataSetValues",
unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "b")
),
- term("groupBy", "b"),
- term("select", "b")
+ term("distinct", "b")
),
tuples(List(null)),
term("values", "b")
@@ -255,14 +251,13 @@ class DistinctAggregateTest extends TableTestBase {
unaryNode(
"DataSetValues",
unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "a")
),
- term("groupBy", "a"),
- term("select", "a")
+ term("distinct", "a")
),
tuples(List(null)),
term("values", "a")
@@ -282,14 +277,13 @@ class DistinctAggregateTest extends TableTestBase {
unaryNode(
"DataSetValues",
unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "b")
),
- term("groupBy", "b"),
- term("select", "b")
+ term("distinct", "b")
),
tuples(List(null)),
term("values", "b")
@@ -384,14 +378,13 @@ class DistinctAggregateTest extends TableTestBase {
unaryNode(
"DataSetAggregate",
unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "a", "b")
),
- term("groupBy", "a, b"),
- term("select", "a, b")
+ term("distinct", "a, b")
),
term("groupBy", "a"),
term("select", "a, SUM(b) AS EXPR$2, COUNT(b) AS EXPR$3")
@@ -430,14 +423,13 @@ class DistinctAggregateTest extends TableTestBase {
unaryNode(
"DataSetAggregate",
unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "a", "b")
),
- term("groupBy", "a, b"),
- term("select", "a, b")
+ term("distinct", "a, b")
),
term("groupBy", "a"),
term("select", "a, SUM(b) AS EXPR$2")
@@ -451,14 +443,13 @@ class DistinctAggregateTest extends TableTestBase {
unaryNode(
"DataSetAggregate",
unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "a", "c")
),
- term("groupBy", "a, c"),
- term("select", "a, c")
+ term("distinct", "a, c")
),
term("groupBy", "a"),
term("select", "a, COUNT(c) AS EXPR$3")
http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
index 516fcd2..3e44526 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
@@ -55,7 +55,7 @@ class QueryDecorrelationTest extends TableTestBase {
term("select", "empno", "salary")
),
unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
binaryNode(
@@ -78,8 +78,7 @@ class QueryDecorrelationTest extends TableTestBase {
),
term("select", "empno")
),
- term("groupBy", "empno"),
- term("select", "empno")
+ term("distinct", "empno")
),
term("where", "=(empno0, empno)"),
term("join", "empno", "salary", "empno0"),
@@ -145,7 +144,7 @@ class QueryDecorrelationTest extends TableTestBase {
term("select", "salary", "deptno")
),
unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
binaryNode(
@@ -166,8 +165,7 @@ class QueryDecorrelationTest extends TableTestBase {
),
term("select", "deptno0")
),
- term("groupBy", "deptno0"),
- term("select", "deptno0")
+ term("distinct", "deptno0")
),
term("where", "=(deptno, deptno0)"),
term("join", "salary", "deptno", "deptno0"),
http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
index d70a32a..f902338 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
@@ -66,14 +66,13 @@ class SetOperatorsTest extends TableTestBase {
term("select", "b_long")
),
unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "a_long")
),
- term("groupBy", "a_long"),
- term("select", "a_long")
+ term("distinct", "a_long")
),
term("where", "=(a_long, b_long)"),
term("join", "b_long", "a_long"),
http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
index 0066ad2..a0412d5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
@@ -117,14 +117,13 @@ class FieldProjectionTest extends TableTestBase {
val expected = unaryNode(
"DataSetCalc",
unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "a", "c")
),
- term("groupBy", "a", "c"),
- term("select", "a", "c")
+ term("distinct", "a", "c")
),
term("select", "a")
)
@@ -138,14 +137,13 @@ class FieldProjectionTest extends TableTestBase {
val resultTable = sourceTable.groupBy('a, 'c).select('a, 'c)
val expected = unaryNode(
- "DataSetAggregate",
+ "DataSetDistinct",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "a", "c")
),
- term("groupBy", "a", "c"),
- term("select", "a", "c")
+ term("distinct", "a", "c")
)
util.verifyTable(resultTable, expected)