You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/01 06:44:23 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4905 Support limit .. offset ... in spark query engine

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new 01036af  KYLIN-4905 Support limit .. offset ... in spark query engine
01036af is described below

commit 01036afd898e313a7d3b31f326dc780fd47f8fd0
Author: zhengshengjun <sh...@sina.com>
AuthorDate: Fri Feb 19 08:11:09 2021 +0800

    KYLIN-4905 Support limit .. offset ... in spark query engine
---
 .../src/test/resources/query/sql_limit/query00.sql |  24 ++++++++++++++++++++
 .../sql_limit/query00.sql.expected/._SUCCESS.crc   | Bin 0 -> 8 bytes
 ...90bf53-2025-46e5-8133-2a6bf503206c-c000.csv.crc | Bin 0 -> 12 bytes
 .../query/sql_limit/query00.sql.expected/_SUCCESS  |   0
 ...0-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv |   6 +++++
 .../kylin/query/runtime/plans/LimitPlan.scala      |   2 ++
 .../kylin/engine/spark2/NBuildAndQueryTest.java    |   1 +
 .../org/apache/kylin/query/exec/SparkExec.java     |  18 +++++++++++++--
 .../apache/kylin/query/relnode/OLAPTableScan.java  |  25 +++++++++++++++++++++
 9 files changed, 74 insertions(+), 2 deletions(-)

diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql b/kylin-it/src/test/resources/query/sql_limit/query00.sql
new file mode 100644
index 0000000..da7af83
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_limit/query00.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select slr_segment_cd, sum(price)
+from test_kylin_fact
+group by slr_segment_cd
+order by sum(price) desc
+limit 4 offset 2
+;{"scanRowCount":300,"scanBytes":0,"scanFiles":1,"cuboidId":14336}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/._SUCCESS.crc b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/._SUCCESS.crc
new file mode 100644
index 0000000..3b7b044
Binary files /dev/null and b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/._SUCCESS.crc differ
diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/.part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv.crc b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/.part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv.crc
new file mode 100644
index 0000000..8134a78
Binary files /dev/null and b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/.part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv.crc differ
diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/_SUCCESS b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/_SUCCESS
new file mode 100644
index 0000000..e69de29
diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv
new file mode 100644
index 0000000..cc69342
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv
@@ -0,0 +1,6 @@
+13,597780.2600
+11,592133.2300
+12,570202.3700
+14,570158.2000
+5,567907.4600
+16,556006.0200
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/LimitPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/LimitPlan.scala
index beb4e91..f5ff285 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/LimitPlan.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/LimitPlan.scala
@@ -41,6 +41,8 @@ object LimitPlan {
       val offset = BigDecimal(rel.localOffset.accept(visitor).toString).toInt
       inputs
         .get(0)
+        //TODO KYLIN-4905 currently spark doesn't support limit...offset, support this in kylin server side
+        .limit(offset + limit)
         //.limitRange(offset, offset + limit)
     }
   }
diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
index 537980f..dc84886 100644
--- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
+++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
@@ -200,6 +200,7 @@ public class NBuildAndQueryTest extends LocalWithSparkSessionTest {
             tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_unionall"));
             tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_values"));
             tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_window"));
+            tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_limit"));
         }
         logger.info("Total {} tasks.", tasks.size());
         return tasks;
diff --git a/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java b/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java
index 03c5837..8571ab1 100644
--- a/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java
+++ b/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java
@@ -22,8 +22,10 @@ import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Linq4j;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
 import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.query.relnode.OLAPLimitRel;
 import org.apache.kylin.query.relnode.OLAPRel;
 
 public class SparkExec {
@@ -37,7 +39,13 @@ public class SparkExec {
         RelDataType rowType = (RelDataType) QueryContextFacade.current().getResultType();
         try {
             Enumerable<Object[]> computer = QueryEngineFactory.compute(dataContext, olapRel, rowType);
-            return computer;
+            //TODO KYLIN-4905 currently spark doesn't support limit...offset.., support this in kylin server side
+            if (olapRel instanceof OLAPLimitRel && ((OLAPLimitRel) olapRel).localOffset != null) {
+                RexLiteral literal = (RexLiteral) ((OLAPLimitRel) olapRel).localOffset;
+                return computer.skip(Integer.valueOf(literal.getValue().toString()));
+            } else {
+                return computer;
+            }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -52,7 +60,13 @@ public class SparkExec {
         RelDataType rowType = (RelDataType) QueryContextFacade.current().getResultType();
         try {
             Enumerable<Object> objects = QueryEngineFactory.computeSCALA(dataContext, olapRel, rowType);
-            return objects;
+            //TODO KYLIN-4905 currently spark doesn't support limit...offset.., support this in kylin server side
+            if (olapRel instanceof OLAPLimitRel && ((OLAPLimitRel) olapRel).localOffset != null) {
+                RexLiteral literal = (RexLiteral) ((OLAPLimitRel) olapRel).localOffset;
+                return objects.skip(Integer.valueOf(literal.getValue().toString()));
+            } else {
+                return objects;
+            }
 
         } catch (Exception e) {
             throw new RuntimeException(e);
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
index f7a363f..bbb4042 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
@@ -62,6 +62,7 @@ import org.apache.calcite.rel.rules.ReduceExpressionsRule;
 import org.apache.calcite.rel.rules.SemiJoinRule;
 import org.apache.calcite.rel.rules.SortJoinTransposeRule;
 import org.apache.calcite.rel.rules.SortUnionTransposeRule;
+import org.apache.calcite.rel.rules.SortProjectTransposeRule;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -211,6 +212,30 @@ public class OLAPTableScan extends TableScan implements OLAPRel, EnumerableRel {
 
         // see Dec 26th email @ http://mail-archives.apache.org/mod_mbox/calcite-dev/201412.mbox/browser
         planner.removeRule(ExpandConversionRule.INSTANCE);
+
+        /*** TODO KYLIN-4905
+         * Spark doesn't support limit...offset.., we implement this in KYLIN query server.
+         * The key is to keep OLAPLimitRel always the root RelNode, then take result indexed from (offset) to (offset + limit).
+         * But SortProjectTransposeRule will break the key, which transpose sort and project.
+         * eg: select sum(price), seller_id from kylin_sales  group by seller_id order by sum(price) limit 10 offset 3
+
+            1. Calcite optimized plan with SortProjectTransposeRule enabled:
+                OLAPProjectRel
+                 |_OLAPLimitRel (offset=3,fetch=10)
+                   |_OLAPSortRel
+                     |_OLAPAggregateRel
+                       |_OLAPProjectRel
+                         |_OLAPTableScan
+
+            2. Calcite optimized plan with SortProjectTransposeRule removed:
+                OLAPLimitRel  (offset=3,fetch=10)
+                  |_OLAPSortRel
+                    |_ OLAPAggregateRel
+                      |_ OLAPProjectRel
+                        |_OLAPTableScan
+
+         * ***/
+        planner.removeRule(SortProjectTransposeRule.INSTANCE);
     }
 
     protected void addRules(final RelOptPlanner planner, List<String> rules) {