You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/02/07 09:56:55 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4888,
Performance optimization of union query with spark engine
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new a4a8480 KYLIN-4888, Performance optimization of union query with spark engine
a4a8480 is described below
commit a4a8480a05449b7ab19c7eace096dc3e39697dcb
Author: feng.zhu <fi...@outlook.com>
AuthorDate: Wed Jan 27 20:06:28 2021 +0800
KYLIN-4888, Performance optimization of union query with spark engine
---
.../apache/kylin/query/runtime/CalciteToSparkPlaner.scala | 8 ++++----
.../scala/org/apache/kylin/query/runtime/SparkEngine.java | 1 -
.../org/apache/kylin/query/runtime/plans/UnionPlan.scala | 13 +++++--------
.../main/java/org/apache/kylin/query/exec/SparkExec.java | 4 ----
4 files changed, 9 insertions(+), 17 deletions(-)
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala
index 4d966d9..4642b52 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala
@@ -23,8 +23,8 @@ import java.util
import org.apache.kylin.shaded.com.google.common.collect.Lists
import org.apache.calcite.DataContext
import org.apache.calcite.rel.{RelNode, RelVisitor}
-import org.apache.kylin.query.relnode.{OLAPAggregateRel, OLAPFilterRel, OLAPJoinRel, OLAPLimitRel, OLAPProjectRel, OLAPSortRel, OLAPTableScan, OLAPUnionRel, OLAPValuesRel, OLAPWindowRel}
-import org.apache.kylin.query.runtime.plans.{AggregatePlan, FilterPlan, LimitPlan, ProjectPlan, SortPlan, TableScanPlan, ValuesPlan, WindowPlan}
+import org.apache.kylin.query.relnode._
+import org.apache.kylin.query.runtime.plans._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.DataFrame
@@ -95,14 +95,14 @@ class CalciteToSparkPlaner(dataContext: DataContext) extends RelVisitor with Log
val right = stack.pop()
val left = stack.pop()
logTime("join") {
- plans.JoinPlan.join(Lists.newArrayList(left, right), rel)
+ JoinPlan.join(Lists.newArrayList(left, right), rel)
}
}
case rel: OLAPUnionRel =>
val size = unionStack.pop()
val java = Range(0, stack.size() - size).map(a => stack.pop()).asJava
logTime("union") {
- plans.UnionPlan.union(Lists.newArrayList(java), rel, dataContext)
+ UnionPlan.union(Lists.newArrayList(java), rel, dataContext)
}
case rel: OLAPValuesRel =>
logTime("values") {
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
index ed51427..c2b0cc4 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
@@ -57,7 +57,6 @@ public class SparkEngine implements QueryEngine {
log.trace("Begin planning spark plan.");
long start = System.currentTimeMillis();
CalciteToSparkPlaner calciteToSparkPlaner = new CalciteToSparkPlaner(dataContext);
- long t = System.currentTimeMillis();
calciteToSparkPlaner.go(relNode);
long takeTime = System.currentTimeMillis() - start;
log.trace("Plan take {} ms", takeTime);
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/UnionPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/UnionPlan.scala
index ac45f0e..88647de 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/UnionPlan.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/UnionPlan.scala
@@ -31,14 +31,11 @@ object UnionPlan {
dataContext: DataContext): DataFrame = {
var df = inputs.get(0)
val drop = inputs.asScala.drop(1)
- if (rel.all) {
- for (other <- drop) {
- df = df.union(other)
- }
- } else {
- for (other <- drop) {
- df = df.union(other).distinct()
- }
+ for (other <- drop) {
+ df = df.union(other)
+ }
+ if (!rel.all) {
+ df = df.distinct()
}
df
}
diff --git a/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java b/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java
index b469446..03c5837 100644
--- a/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java
+++ b/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java
@@ -25,13 +25,9 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.query.relnode.OLAPRel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class SparkExec {
- private static final Logger logger = LoggerFactory.getLogger(SparkExec.class);
-
public static Enumerable<Object[]> collectToEnumerable(DataContext dataContext) {
if (BackdoorToggles.getPrepareOnly()) {
return Linq4j.emptyEnumerable();