You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/01 06:44:23 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4905 Support
limit .. offset ... in spark query engine
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new 01036af KYLIN-4905 Support limit .. offset ... in spark query engine
01036af is described below
commit 01036afd898e313a7d3b31f326dc780fd47f8fd0
Author: zhengshengjun <sh...@sina.com>
AuthorDate: Fri Feb 19 08:11:09 2021 +0800
KYLIN-4905 Support limit .. offset ... in spark query engine
---
.../src/test/resources/query/sql_limit/query00.sql | 24 ++++++++++++++++++++
.../sql_limit/query00.sql.expected/._SUCCESS.crc | Bin 0 -> 8 bytes
...90bf53-2025-46e5-8133-2a6bf503206c-c000.csv.crc | Bin 0 -> 12 bytes
.../query/sql_limit/query00.sql.expected/_SUCCESS | 0
...0-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv | 6 +++++
.../kylin/query/runtime/plans/LimitPlan.scala | 2 ++
.../kylin/engine/spark2/NBuildAndQueryTest.java | 1 +
.../org/apache/kylin/query/exec/SparkExec.java | 18 +++++++++++++--
.../apache/kylin/query/relnode/OLAPTableScan.java | 25 +++++++++++++++++++++
9 files changed, 74 insertions(+), 2 deletions(-)
diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql b/kylin-it/src/test/resources/query/sql_limit/query00.sql
new file mode 100644
index 0000000..da7af83
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_limit/query00.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select slr_segment_cd, sum(price)
+from test_kylin_fact
+group by slr_segment_cd
+order by sum(price) desc
+limit 4 offset 2
+;{"scanRowCount":300,"scanBytes":0,"scanFiles":1,"cuboidId":14336}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/._SUCCESS.crc b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/._SUCCESS.crc
new file mode 100644
index 0000000..3b7b044
Binary files /dev/null and b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/._SUCCESS.crc differ
diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/.part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv.crc b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/.part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv.crc
new file mode 100644
index 0000000..8134a78
Binary files /dev/null and b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/.part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv.crc differ
diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/_SUCCESS b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/_SUCCESS
new file mode 100644
index 0000000..e69de29
diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv
new file mode 100644
index 0000000..cc69342
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv
@@ -0,0 +1,6 @@
+13,597780.2600
+11,592133.2300
+12,570202.3700
+14,570158.2000
+5,567907.4600
+16,556006.0200
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/LimitPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/LimitPlan.scala
index beb4e91..f5ff285 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/LimitPlan.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/LimitPlan.scala
@@ -41,6 +41,8 @@ object LimitPlan {
val offset = BigDecimal(rel.localOffset.accept(visitor).toString).toInt
inputs
.get(0)
+ //TODO KYLIN-4905 currently spark doesn't support limit...offset, support this in kylin server side
+ .limit(offset + limit)
//.limitRange(offset, offset + limit)
}
}
diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
index 537980f..dc84886 100644
--- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
+++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
@@ -200,6 +200,7 @@ public class NBuildAndQueryTest extends LocalWithSparkSessionTest {
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_unionall"));
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_values"));
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_window"));
+ tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_limit"));
}
logger.info("Total {} tasks.", tasks.size());
return tasks;
diff --git a/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java b/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java
index 03c5837..8571ab1 100644
--- a/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java
+++ b/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java
@@ -22,8 +22,10 @@ import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.query.relnode.OLAPLimitRel;
import org.apache.kylin.query.relnode.OLAPRel;
public class SparkExec {
@@ -37,7 +39,13 @@ public class SparkExec {
RelDataType rowType = (RelDataType) QueryContextFacade.current().getResultType();
try {
Enumerable<Object[]> computer = QueryEngineFactory.compute(dataContext, olapRel, rowType);
- return computer;
+ //TODO KYLIN-4905 currently spark doesn't support limit...offset.., support this in kylin server side
+ if (olapRel instanceof OLAPLimitRel && ((OLAPLimitRel) olapRel).localOffset != null) {
+ RexLiteral literal = (RexLiteral) ((OLAPLimitRel) olapRel).localOffset;
+ return computer.skip(Integer.valueOf(literal.getValue().toString()));
+ } else {
+ return computer;
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -52,7 +60,13 @@ public class SparkExec {
RelDataType rowType = (RelDataType) QueryContextFacade.current().getResultType();
try {
Enumerable<Object> objects = QueryEngineFactory.computeSCALA(dataContext, olapRel, rowType);
- return objects;
+ //TODO KYLIN-4905 currently spark doesn't support limit...offset.., support this in kylin server side
+ if (olapRel instanceof OLAPLimitRel && ((OLAPLimitRel) olapRel).localOffset != null) {
+ RexLiteral literal = (RexLiteral) ((OLAPLimitRel) olapRel).localOffset;
+ return objects.skip(Integer.valueOf(literal.getValue().toString()));
+ } else {
+ return objects;
+ }
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
index f7a363f..bbb4042 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
@@ -62,6 +62,7 @@ import org.apache.calcite.rel.rules.ReduceExpressionsRule;
import org.apache.calcite.rel.rules.SemiJoinRule;
import org.apache.calcite.rel.rules.SortJoinTransposeRule;
import org.apache.calcite.rel.rules.SortUnionTransposeRule;
+import org.apache.calcite.rel.rules.SortProjectTransposeRule;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
@@ -211,6 +212,30 @@ public class OLAPTableScan extends TableScan implements OLAPRel, EnumerableRel {
// see Dec 26th email @ http://mail-archives.apache.org/mod_mbox/calcite-dev/201412.mbox/browser
planner.removeRule(ExpandConversionRule.INSTANCE);
+
+ /*** TODO KYLIN-4905
+ * Spark doesn't support limit...offset.., we implement this in KYLIN query server.
+ * The key is to keep OLAPLimitRel always the root RelNode, then take result indexed from (offset) to (offset + limit).
+ * But SortProjectTransposeRule will break the key, which transpose sort and project.
+ * eg: select sum(price), seller_id from kylin_sales group by seller_id order by sum(price) limit 10 offset 3
+
+ 1. Calcite optimized plan with SortProjectTransposeRule enabled:
+ OLAPProjectRel
+ |_OLAPLimitRel (offset=3,fetch=10)
+ |_OLAPSortRel
+ |_OLAPAggregateRel
+ |_OLAPProjectRel
+ |_OLAPTableScan
+
+ 2. Calcite optimized plan with SortProjectTransposeRule removed:
+ OLAPLimitRel (offset=3,fetch=10)
+ |_OLAPSortRel
+ |_ OLAPAggregateRel
+ |_ OLAPProjectRel
+ |_OLAPTableScan
+
+ * ***/
+ planner.removeRule(SortProjectTransposeRule.INSTANCE);
}
protected void addRules(final RelOptPlanner planner, List<String> rules) {