You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/02/07 09:56:55 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4888, Performance optimization of union query with spark engine

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new a4a8480  KYLIN-4888, Performance optimization of union query with spark engine
a4a8480 is described below

commit a4a8480a05449b7ab19c7eace096dc3e39697dcb
Author: feng.zhu <fi...@outlook.com>
AuthorDate: Wed Jan 27 20:06:28 2021 +0800

    KYLIN-4888, Performance optimization of union query with spark engine
---
 .../apache/kylin/query/runtime/CalciteToSparkPlaner.scala   |  8 ++++----
 .../scala/org/apache/kylin/query/runtime/SparkEngine.java   |  1 -
 .../org/apache/kylin/query/runtime/plans/UnionPlan.scala    | 13 +++++--------
 .../main/java/org/apache/kylin/query/exec/SparkExec.java    |  4 ----
 4 files changed, 9 insertions(+), 17 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala
index 4d966d9..4642b52 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala
@@ -23,8 +23,8 @@ import java.util
 import org.apache.kylin.shaded.com.google.common.collect.Lists
 import org.apache.calcite.DataContext
 import org.apache.calcite.rel.{RelNode, RelVisitor}
-import org.apache.kylin.query.relnode.{OLAPAggregateRel, OLAPFilterRel, OLAPJoinRel, OLAPLimitRel, OLAPProjectRel, OLAPSortRel, OLAPTableScan, OLAPUnionRel, OLAPValuesRel, OLAPWindowRel}
-import org.apache.kylin.query.runtime.plans.{AggregatePlan, FilterPlan, LimitPlan, ProjectPlan, SortPlan, TableScanPlan, ValuesPlan, WindowPlan}
+import org.apache.kylin.query.relnode._
+import org.apache.kylin.query.runtime.plans._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.DataFrame
 
@@ -95,14 +95,14 @@ class CalciteToSparkPlaner(dataContext: DataContext) extends RelVisitor with Log
           val right = stack.pop()
           val left = stack.pop()
           logTime("join") {
-            plans.JoinPlan.join(Lists.newArrayList(left, right), rel)
+            JoinPlan.join(Lists.newArrayList(left, right), rel)
           }
         }
       case rel: OLAPUnionRel =>
         val size = unionStack.pop()
         val java = Range(0, stack.size() - size).map(a => stack.pop()).asJava
         logTime("union") {
-          plans.UnionPlan.union(Lists.newArrayList(java), rel, dataContext)
+          UnionPlan.union(Lists.newArrayList(java), rel, dataContext)
         }
       case rel: OLAPValuesRel =>
         logTime("values") {
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
index ed51427..c2b0cc4 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
@@ -57,7 +57,6 @@ public class SparkEngine implements QueryEngine {
         log.trace("Begin planning spark plan.");
         long start = System.currentTimeMillis();
         CalciteToSparkPlaner calciteToSparkPlaner = new CalciteToSparkPlaner(dataContext);
-        long t = System.currentTimeMillis();
         calciteToSparkPlaner.go(relNode);
         long takeTime = System.currentTimeMillis() - start;
         log.trace("Plan take {} ms", takeTime);
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/UnionPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/UnionPlan.scala
index ac45f0e..88647de 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/UnionPlan.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/UnionPlan.scala
@@ -31,14 +31,11 @@ object UnionPlan {
     dataContext: DataContext): DataFrame = {
     var df = inputs.get(0)
     val drop = inputs.asScala.drop(1)
-    if (rel.all) {
-      for (other <- drop) {
-        df = df.union(other)
-      }
-    } else {
-      for (other <- drop) {
-        df = df.union(other).distinct()
-      }
+    for (other <- drop) {
+      df = df.union(other)
+    }
+    if (!rel.all) {
+      df = df.distinct()
     }
     df
   }
diff --git a/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java b/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java
index b469446..03c5837 100644
--- a/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java
+++ b/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java
@@ -25,13 +25,9 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.query.relnode.OLAPRel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class SparkExec {
 
-    private static final Logger logger = LoggerFactory.getLogger(SparkExec.class);
-
     public static Enumerable<Object[]> collectToEnumerable(DataContext dataContext) {
         if (BackdoorToggles.getPrepareOnly()) {
             return Linq4j.emptyEnumerable();