You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/03/08 13:15:33 UTC

flink git commit: [FLINK-5722] [table] Add dedicated DataSetDistinct operator.

Repository: flink
Updated Branches:
  refs/heads/master 338c30a41 -> 7a629fc59


[FLINK-5722] [table] Add dedicated DataSetDistinct operator.

- Uses hash-combiner for more better combine rate.

This closes #3471.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a629fc5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a629fc5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a629fc5

Branch: refs/heads/master
Commit: 7a629fc59ff206ba51f22e1bf35fe50882e63538
Parents: 338c30a
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Mar 3 23:17:36 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Mar 8 14:14:06 2017 +0100

----------------------------------------------------------------------
 .../plan/nodes/dataset/DataSetDistinct.scala    | 94 ++++++++++++++++++++
 .../flink/table/plan/rules/FlinkRuleSets.scala  |  1 +
 .../rules/dataSet/DataSetAggregateRule.scala    |  9 +-
 .../rules/dataSet/DataSetDistinctRule.scala     | 61 +++++++++++++
 .../runtime/aggregate/DistinctReduce.scala      | 26 ++++++
 .../scala/batch/sql/DistinctAggregateTest.scala | 45 ++++------
 .../batch/sql/QueryDecorrelationTest.scala      | 10 +--
 .../api/scala/batch/sql/SetOperatorsTest.scala  |  5 +-
 .../scala/batch/table/FieldProjectionTest.scala | 10 +--
 9 files changed, 218 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala
new file mode 100644
index 0000000..14116f1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.runtime.aggregate.DistinctReduce
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * DataSet RelNode for a Distinct (LogicalAggregate without aggregation functions).
+  *
+  */
+class DataSetDistinct(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   input: RelNode,
+   rowRelDataType: RelDataType,
+   ruleDescription: String)
+  extends SingleRel(cluster, traitSet, input)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetDistinct(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      rowRelDataType,
+      ruleDescription
+    )
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+
+    val child = this.getInput
+    val rowCnt = metadata.getRowCount(child)
+    val rowSize = this.estimateRowSize(child.getRowType)
+    // less expensive than DataSetAggregate without aggregates
+    planner.getCostFactory.makeCost(rowCnt, 0, rowCnt * rowSize * 0.9)
+  }
+
+  override def toString: String = {
+    s"Distinct(distinct: (${rowTypeToString(rowRelDataType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("distinct", rowTypeToString(rowRelDataType))
+  }
+
+  def rowTypeToString(rowType: RelDataType): String = {
+    rowType.getFieldList.asScala.map(_.getName).mkString(", ")
+  }
+
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
+
+    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val groupKeys = (0 until rowRelDataType.getFieldCount).toArray // group on all fields
+
+    inputDS
+      .groupBy(groupKeys: _*)
+      .reduce(new DistinctReduce)
+      .setCombineHint(CombineHint.HASH) // use hash-combiner
+      .name("distinct")
+      .returns(inputDS.getType)
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 39e4353..3b20236 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -110,6 +110,7 @@ object FlinkRuleSets {
     DataSetWindowAggregateRule.INSTANCE,
     DataSetAggregateRule.INSTANCE,
     DataSetAggregateWithNullValuesRule.INSTANCE,
+    DataSetDistinctRule.INSTANCE,
     DataSetCalcRule.INSTANCE,
     DataSetJoinRule.INSTANCE,
     DataSetSingleRowJoinRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
index 9c0acdd..98d1c13 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
@@ -22,7 +22,6 @@ import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTrait
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.logical.LogicalAggregate
-import org.apache.flink.table.api.TableException
 import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention, DataSetUnion}
 import scala.collection.JavaConversions._
 
@@ -42,6 +41,14 @@ class DataSetAggregateRule
       return false
     }
 
+    // distinct is translated into dedicated operator
+    if (agg.getAggCallList.isEmpty &&
+      agg.getGroupCount == agg.getRowType.getFieldCount &&
+      agg.getRowType.equals(agg.getInput.getRowType) &&
+      agg.getGroupSets.size() == 1) {
+      return false
+    }
+
     // check if we have distinct aggregates
     val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetDistinctRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetDistinctRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetDistinctRule.scala
new file mode 100644
index 0000000..644ff9b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetDistinctRule.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetDistinct}
+
+class DataSetDistinctRule
+  extends ConverterRule(
+      classOf[LogicalAggregate],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "DataSetDistinctRule")
+  {
+
+    override def matches(call: RelOptRuleCall): Boolean = {
+      val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+
+      // only accept distinct
+      agg.getAggCallList.isEmpty &&
+        agg.getGroupCount == agg.getRowType.getFieldCount &&
+        agg.getRowType.equals(agg.getInput.getRowType) &&
+        agg.getGroupSets.size() == 1
+    }
+
+    def convert(rel: RelNode): RelNode = {
+      val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+      val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
+
+      new DataSetDistinct(
+        rel.getCluster,
+        traitSet,
+        convInput,
+        agg.getRowType,
+        description)
+    }
+  }
+
+object DataSetDistinctRule {
+  val INSTANCE: RelOptRule = new DataSetDistinctRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DistinctReduce.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DistinctReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DistinctReduce.scala
new file mode 100644
index 0000000..f440573
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DistinctReduce.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.types.Row
+
+class DistinctReduce extends ReduceFunction[Row] {
+  override def reduce(value1: Row, value2: Row): Row = value1
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
index 38e4ea8..54b4d24 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
@@ -40,14 +40,13 @@ class DistinctAggregateTest extends TableTestBase {
         unaryNode(
           "DataSetValues",
           unaryNode(
-            "DataSetAggregate",
+            "DataSetDistinct",
             unaryNode(
               "DataSetCalc",
               batchTableNode(0),
               term("select", "a")
             ),
-            term("groupBy", "a"),
-            term("select", "a")
+            term("distinct", "a")
           ),
           tuples(List(null)),
           term("values", "a")
@@ -74,14 +73,13 @@ class DistinctAggregateTest extends TableTestBase {
         unaryNode(
           "DataSetValues",
           unaryNode(
-            "DataSetAggregate",
+            "DataSetDistinct",
             unaryNode(
               "DataSetCalc",
               batchTableNode(0),
               term("select", "a")
             ),
-            term("groupBy", "a"),
-            term("select", "a")
+            term("distinct", "a")
           ),
           tuples(List(null)),
           term("values", "a")
@@ -174,14 +172,13 @@ class DistinctAggregateTest extends TableTestBase {
           unaryNode(
             "DataSetValues",
             unaryNode(
-              "DataSetAggregate",
+              "DataSetDistinct",
               unaryNode(
                 "DataSetCalc",
                 batchTableNode(0),
                 term("select", "a")
               ),
-              term("groupBy", "a"),
-              term("select", "a")
+              term("distinct", "a")
             ),
             tuples(List(null)),
             term("values", "a")
@@ -197,14 +194,13 @@ class DistinctAggregateTest extends TableTestBase {
           unaryNode(
             "DataSetValues",
             unaryNode(
-              "DataSetAggregate",
+              "DataSetDistinct",
               unaryNode(
                 "DataSetCalc",
                 batchTableNode(0),
                 term("select", "b")
               ),
-              term("groupBy", "b"),
-              term("select", "b")
+              term("distinct", "b")
             ),
             tuples(List(null)),
             term("values", "b")
@@ -255,14 +251,13 @@ class DistinctAggregateTest extends TableTestBase {
               unaryNode(
                 "DataSetValues",
                 unaryNode(
-                  "DataSetAggregate",
+                  "DataSetDistinct",
                   unaryNode(
                     "DataSetCalc",
                     batchTableNode(0),
                     term("select", "a")
                   ),
-                  term("groupBy", "a"),
-                  term("select", "a")
+                  term("distinct", "a")
                 ),
                 tuples(List(null)),
                 term("values", "a")
@@ -282,14 +277,13 @@ class DistinctAggregateTest extends TableTestBase {
             unaryNode(
               "DataSetValues",
               unaryNode(
-                "DataSetAggregate",
+                "DataSetDistinct",
                 unaryNode(
                   "DataSetCalc",
                   batchTableNode(0),
                   term("select", "b")
                 ),
-                term("groupBy", "b"),
-                term("select", "b")
+                term("distinct", "b")
               ),
               tuples(List(null)),
               term("values", "b")
@@ -384,14 +378,13 @@ class DistinctAggregateTest extends TableTestBase {
         unaryNode(
           "DataSetAggregate",
           unaryNode(
-            "DataSetAggregate",
+            "DataSetDistinct",
             unaryNode(
               "DataSetCalc",
               batchTableNode(0),
               term("select", "a", "b")
             ),
-            term("groupBy", "a, b"),
-            term("select", "a, b")
+            term("distinct", "a, b")
           ),
           term("groupBy", "a"),
           term("select", "a, SUM(b) AS EXPR$2, COUNT(b) AS EXPR$3")
@@ -430,14 +423,13 @@ class DistinctAggregateTest extends TableTestBase {
             unaryNode(
               "DataSetAggregate",
               unaryNode(
-                "DataSetAggregate",
+                "DataSetDistinct",
                 unaryNode(
                   "DataSetCalc",
                   batchTableNode(0),
                   term("select", "a", "b")
                 ),
-                term("groupBy", "a, b"),
-                term("select", "a, b")
+                term("distinct", "a, b")
               ),
               term("groupBy", "a"),
               term("select", "a, SUM(b) AS EXPR$2")
@@ -451,14 +443,13 @@ class DistinctAggregateTest extends TableTestBase {
         unaryNode(
           "DataSetAggregate",
           unaryNode(
-            "DataSetAggregate",
+            "DataSetDistinct",
             unaryNode(
               "DataSetCalc",
               batchTableNode(0),
               term("select", "a", "c")
             ),
-            term("groupBy", "a, c"),
-            term("select", "a, c")
+            term("distinct", "a, c")
           ),
           term("groupBy", "a"),
           term("select", "a, COUNT(c) AS EXPR$3")

http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
index 516fcd2..3e44526 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
@@ -55,7 +55,7 @@ class QueryDecorrelationTest extends TableTestBase {
             term("select", "empno", "salary")
           ),
           unaryNode(
-            "DataSetAggregate",
+            "DataSetDistinct",
             unaryNode(
               "DataSetCalc",
               binaryNode(
@@ -78,8 +78,7 @@ class QueryDecorrelationTest extends TableTestBase {
               ),
               term("select", "empno")
             ),
-            term("groupBy", "empno"),
-            term("select", "empno")
+            term("distinct", "empno")
           ),
           term("where", "=(empno0, empno)"),
           term("join", "empno", "salary", "empno0"),
@@ -145,7 +144,7 @@ class QueryDecorrelationTest extends TableTestBase {
             term("select", "salary", "deptno")
           ),
           unaryNode(
-            "DataSetAggregate",
+            "DataSetDistinct",
             unaryNode(
               "DataSetCalc",
               binaryNode(
@@ -166,8 +165,7 @@ class QueryDecorrelationTest extends TableTestBase {
               ),
               term("select", "deptno0")
             ),
-            term("groupBy", "deptno0"),
-            term("select", "deptno0")
+            term("distinct", "deptno0")
           ),
           term("where", "=(deptno, deptno0)"),
           term("join", "salary", "deptno", "deptno0"),

http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
index d70a32a..f902338 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
@@ -66,14 +66,13 @@ class SetOperatorsTest extends TableTestBase {
                 term("select", "b_long")
               ),
               unaryNode(
-                "DataSetAggregate",
+                "DataSetDistinct",
                 unaryNode(
                   "DataSetCalc",
                   batchTableNode(0),
                   term("select", "a_long")
                 ),
-                term("groupBy", "a_long"),
-                term("select", "a_long")
+                term("distinct", "a_long")
               ),
               term("where", "=(a_long, b_long)"),
               term("join", "b_long", "a_long"),

http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
index 0066ad2..a0412d5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
@@ -117,14 +117,13 @@ class FieldProjectionTest extends TableTestBase {
     val expected = unaryNode(
       "DataSetCalc",
       unaryNode(
-        "DataSetAggregate",
+        "DataSetDistinct",
         unaryNode(
           "DataSetCalc",
           batchTableNode(0),
           term("select", "a", "c")
         ),
-        term("groupBy", "a", "c"),
-        term("select", "a", "c")
+        term("distinct", "a", "c")
       ),
       term("select", "a")
     )
@@ -138,14 +137,13 @@ class FieldProjectionTest extends TableTestBase {
     val resultTable = sourceTable.groupBy('a, 'c).select('a, 'c)
 
     val expected = unaryNode(
-      "DataSetAggregate",
+      "DataSetDistinct",
       unaryNode(
         "DataSetCalc",
         batchTableNode(0),
         term("select", "a", "c")
       ),
-      term("groupBy", "a", "c"),
-      term("select", "a", "c")
+      term("distinct", "a", "c")
     )
 
     util.verifyTable(resultTable, expected)