You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/12/16 05:53:32 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4842 Supports grouping sets function for Kylin 4

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new 64e9365  KYLIN-4842 Supports grouping sets function for Kylin 4
64e9365 is described below

commit 64e93654cdc4ed56bed790fe3a86e57ab78ca02c
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Tue Dec 15 23:45:13 2020 +0800

    KYLIN-4842 Supports grouping sets function for Kylin 4
---
 .../test/resources/query/sql_grouping/query03.sql  | 26 ++++++++++++++++++++++
 .../test/resources/query/sql_grouping/query04.sql  | 26 ++++++++++++++++++++++
 .../test/resources/query/sql_grouping/query05.sql  | 26 ++++++++++++++++++++++
 .../test/resources/query/sql_grouping/query06.sql  | 26 ++++++++++++++++++++++
 .../test/resources/query/sql_grouping/query07.sql  | 24 ++++++++++++++++++++
 .../test/resources/query/sql_grouping/query08.sql  | 24 ++++++++++++++++++++
 .../test/resources/query/sql_grouping/query09.sql  | 24 ++++++++++++++++++++
 .../apache/spark/sql/common/SparkQueryTest.scala   |  4 ++--
 .../kylin/query/runtime/plans/AggregatePlan.scala  | 15 +++++++------
 .../kylin/query/runtime/plans/ValuesPlan.scala     |  3 +--
 .../runtime => spark/sql}/SparkOperation.scala     | 16 ++++++++++---
 .../kylin/engine/spark2/NBuildAndQueryTest.java    |  5 +----
 12 files changed, 201 insertions(+), 18 deletions(-)

diff --git a/kylin-it/src/test/resources/query/sql_grouping/query03.sql b/kylin-it/src/test/resources/query/sql_grouping/query03.sql
new file mode 100644
index 0000000..2adcca5
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_grouping/query03.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+select
+sum(price) as GMV,
+cal_dt as dt,
+(case grouping(slr_segment_cd) when 1 then 'ALL' else cast(slr_segment_cd as varchar(256)) end) as cd,
+(case grouping(lstg_format_name) when 1 then 'ALL' else lstg_format_name end) as name,
+count(*) as TRANS_CNT from test_kylin_fact
+where cal_dt between '2012-01-01' and '2012-02-01'
+group by grouping sets((lstg_format_name, cal_dt, slr_segment_cd), (cal_dt, slr_segment_cd), (lstg_format_name, slr_segment_cd))
+;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_grouping/query04.sql b/kylin-it/src/test/resources/query/sql_grouping/query04.sql
new file mode 100644
index 0000000..fea6da4
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_grouping/query04.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+select
+sum(price) as GMV,
+cal_dt as dt,
+slr_segment_cd as cd,
+lstg_format_name as name,
+count(*) as TRANS_CNT from test_kylin_fact
+where cal_dt between '2012-01-01' and '2012-02-01'
+group by grouping sets((lstg_format_name, cal_dt, slr_segment_cd), (cal_dt, slr_segment_cd), (lstg_format_name, slr_segment_cd))
+;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_grouping/query05.sql b/kylin-it/src/test/resources/query/sql_grouping/query05.sql
new file mode 100644
index 0000000..1a0eb4d
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_grouping/query05.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+select
+sum(price) as GMV,
+grouping(cal_dt) as dt,
+grouping(slr_segment_cd) as cd,
+grouping(lstg_format_name) as name,
+count(*) as TRANS_CNT from test_kylin_fact
+where cal_dt between '2012-01-01' and '2012-02-01'
+group by grouping sets((lstg_format_name, cal_dt, slr_segment_cd), (cal_dt, slr_segment_cd), (lstg_format_name, slr_segment_cd))
+;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_grouping/query06.sql b/kylin-it/src/test/resources/query/sql_grouping/query06.sql
new file mode 100644
index 0000000..1fe8be2
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_grouping/query06.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+select
+sum(price) as GMV,
+cal_dt as dt,
+slr_segment_cd as cd,
+lstg_format_name as name,
+count(*) as TRANS_CNT from test_kylin_fact
+where cal_dt between '2012-01-01' and '2012-02-01'
+group by grouping sets((lstg_format_name, cal_dt, slr_segment_cd))
+;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_grouping/query07.sql b/kylin-it/src/test/resources/query/sql_grouping/query07.sql
new file mode 100644
index 0000000..b0f9923
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_grouping/query07.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+select
+sum(price) as GMV,
+lstg_format_name as name,
+count(*) as TRANS_CNT from test_kylin_fact
+where cal_dt between '2012-01-01' and '2012-02-01'
+group by grouping sets(lstg_format_name)
+;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_grouping/query08.sql b/kylin-it/src/test/resources/query/sql_grouping/query08.sql
new file mode 100644
index 0000000..923d99f
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_grouping/query08.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+select
+lstg_format_name as name,
+sum(price) as GMV,
+count(*) as TRANS_CNT from test_kylin_fact
+where cal_dt between '2012-01-01' and '2012-02-01'
+group by cube(lstg_format_name)
+;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_grouping/query09.sql b/kylin-it/src/test/resources/query/sql_grouping/query09.sql
new file mode 100644
index 0000000..55da2bc
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_grouping/query09.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+select
+slr_segment_cd as cd,
+sum(price) as GMV,
+count(*) as TRANS_CNT from test_kylin_fact
+where cal_dt between '2012-01-01' and '2012-02-01'
+group by rollup(slr_segment_cd)
+;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/SparkQueryTest.scala b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/SparkQueryTest.scala
index dbf9beb..76fc1fc 100644
--- a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/SparkQueryTest.scala
+++ b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/SparkQueryTest.scala
@@ -116,9 +116,9 @@ object SparkQueryTest {
                    |== Results ==
                    |${
                     sideBySide(
-                        s"== Kylin Answer - ${kylinAnswer.size} ==" +:
+                        s"== Expected Answer - ${kylinAnswer.size} ==" +:
                           kylinResults.map(_.toString()),
-                        s"== Spark Answer - ${sparkAnswer.size} ==" +:
+                        s"== Kylin Answer - ${sparkAnswer.size} ==" +:
                           sparkResults.map(_.toString())
                     ).mkString("\n")
                     }
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
index d1d81b9..4907af3 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
@@ -18,11 +18,12 @@
 package org.apache.kylin.query.runtime.plans
 
 import org.apache.calcite.DataContext
+import org.apache.calcite.rel.core.Aggregate
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.sql.SqlKind
 import org.apache.kylin.metadata.model.FunctionDesc
 import org.apache.kylin.query.relnode.{KylinAggregateCall, OLAPAggregateRel}
-import org.apache.kylin.query.runtime.{AggArgc, RuntimeHelper, SparkOperation}
+import org.apache.kylin.query.runtime.RuntimeHelper
 import org.apache.kylin.query.SchemaProcessor
 import org.apache.spark.sql.KylinFunctions._
 import org.apache.spark.sql._
@@ -61,7 +62,11 @@ object AggregatePlan extends LogEx {
     } else {
       dataFrame = genFiltersWhenIntersectCount(rel, dataFrame)
       val aggList = buildAgg(dataFrame.schema, rel)
-      SparkOperation.agg(AggArgc(dataFrame, groupList, aggList))
+      val groupSets = rel.getGroupSets.asScala
+        .map(groupSet => groupSet.asScala.map(groupId => col(schemaNames.apply(groupId))).toList)
+        .toList
+      SparkOperation.agg(AggArgc(dataFrame, groupList, aggList, groupSets,
+        rel.getGroupType() == Aggregate.Group.SIMPLE))
     }
   }
 
@@ -167,11 +172,7 @@ object AggregatePlan extends LogEx {
           case SqlKind.SINGLE_VALUE.sql =>
             first(argNames.head).alias(aggName)
           case FunctionDesc.FUNC_GROUPING =>
-            if (rel.getGroupSet.get(call.getArgList.get(0))) {
-              lit(0).alias(aggName)
-            } else {
-              lit(1).alias(aggName)
-            }
+            grouping(argNames.head).alias(aggName)
           case _ =>
             throw new IllegalArgumentException(
               s"""Unsupported function name $funcName""")
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ValuesPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ValuesPlan.scala
index 3271f39..3d8fa02 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ValuesPlan.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ValuesPlan.scala
@@ -18,8 +18,7 @@
 package org.apache.kylin.query.runtime.plans
 
 import org.apache.kylin.query.relnode.OLAPValuesRel
-import org.apache.kylin.query.runtime.SparkOperation
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{DataFrame, Row, SparkOperation}
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.sql.utils.SparkTypeUtil
 
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkOperation.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparkOperation.scala
similarity index 75%
rename from kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkOperation.scala
rename to kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparkOperation.scala
index 52d407e..44791df 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkOperation.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparkOperation.scala
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kylin.query.runtime
+package org.apache.spark.sql
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -39,7 +39,16 @@ object SparkOperation {
   }
 
   def agg(aggArgc: AggArgc): DataFrame = {
-    if (aggArgc.agg.nonEmpty && aggArgc.group.nonEmpty) {
+    if (aggArgc.agg.nonEmpty && aggArgc.group.nonEmpty &&
+      !aggArgc.isSimpleGroup && aggArgc.groupSets.nonEmpty) {
+      // for grouping sets, group by cube or group by rollup
+      val groupingSets = GroupingSets(aggArgc.groupSets.map(groupSet => groupSet.map(_.expr)),
+        aggArgc.group.map(_.expr),
+        aggArgc.dataFrame.queryExecution.logical,
+        aggArgc.group.map(_.named) ++ aggArgc.agg.map(_.named)
+      )
+      Dataset.ofRows(aggArgc.dataFrame.sparkSession, groupingSets)
+    } else if (aggArgc.agg.nonEmpty && aggArgc.group.nonEmpty) {
       aggArgc.dataFrame
         .groupBy(aggArgc.group: _*)
         .agg(aggArgc.agg.head, aggArgc.agg.drop(1): _*)
@@ -53,4 +62,5 @@ object SparkOperation {
   }
 }
 
-case class AggArgc(dataFrame: DataFrame, group: List[Column], agg: List[Column])
\ No newline at end of file
+case class AggArgc(dataFrame: DataFrame, group: List[Column], agg: List[Column],
+                   groupSets: List[List[Column]], isSimpleGroup: Boolean)
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
index 02c6e8a..ce27ff2 100644
--- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
+++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
@@ -172,10 +172,7 @@ public class NBuildAndQueryTest extends LocalWithSparkSessionTest {
             //tasks.add(new QueryCallable(CompareLevel.NONE, joinType, "sql_extended_column"));
 
             tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_function"));
-
-            // Not support yet
-            //tasks.add(new QueryCallable(CompareLevel.NONE, joinType, "sql_grouping"));
-
+            tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_grouping"));
             tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_h2"));
             tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_hive"));
             tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_intersect_count"));