You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2021/10/23 06:23:32 UTC

[carbondata] branch master updated: [CARBONDATA-4306] Fix Query Performance issue for Spark 3.1

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8953cde  [CARBONDATA-4306] Fix Query Performance issue for Spark 3.1
8953cde is described below

commit 8953cde6ad93ddd3622c60cd48117acfbbee56a2
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Thu Sep 30 14:41:35 2021 +0530

    [CARBONDATA-4306] Fix Query Performance issue for Spark 3.1
    
    Why is this PR needed?
    Currently, with Spark 3.1, some rules are applied many times resulting in performance degrade.
    
    What changes were proposed in this PR?
    Changed Rules apply strategy from Fixed to Once and CarbonOptimizer can directly extend SparkOptimizer avoiding applying same rules many times
    
    This Closes #4229
---
 .../execution/strategy/CarbonSourceStrategy.scala  |  2 +-
 .../apache/spark/sql/CarbonToSparkAdapter.scala    | 23 ++++++----------------
 2 files changed, 7 insertions(+), 18 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
index 009d8c7..8a0779f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
@@ -152,7 +152,7 @@ private[sql] object CarbonSourceStrategy extends SparkStrategy {
     var partitions : (Seq[CatalogTablePartition], Seq[PartitionSpec], Seq[Expression]) =
       (null, null, Seq.empty)
     var filterPredicates = allPredicates
-    if(names.nonEmpty) {
+    if(names.nonEmpty && partitionsFilter.nonEmpty) {
       partitions = CarbonFilters.getCatalogTablePartitions(
         partitionsFilter.filterNot(e => e.find(_.isInstanceOf[PlanExpression[_]]).isDefined),
         SparkSession.getActiveSession.get,
diff --git a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
index 25a27ad..eaceb85 100644
--- a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkOptimizer
 import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
@@ -238,33 +239,21 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
 }
 
 class CarbonOptimizer(session: SparkSession, optimizer: Optimizer) extends
-  Optimizer(session.sessionState.catalogManager) {
+  SparkOptimizer(session.sessionState.catalogManager,
+    session.sessionState.catalog,
+    session.sessionState.experimentalMethods) {
 
   private lazy val mvRules = Seq(Batch("Materialized View Optimizers", Once,
     Seq(new MVRewriteRule(session)): _*))
 
-  private lazy val iudRule = Batch("IUD Optimizers", fixedPoint,
+  private lazy val iudRule = Batch("IUD Optimizers", Once,
     Seq(new CarbonIUDRule(), new CarbonUDFTransformRule(), new CarbonFileIndexReplaceRule()): _*)
 
   private lazy val secondaryIndexRule = Batch("SI Optimizers", Once,
     Seq(new CarbonSITransformationRule(session)): _*)
 
   override def defaultBatches: Seq[Batch] = {
-    mvRules ++ convertedBatch() :+ iudRule :+ secondaryIndexRule
+    mvRules ++ super.defaultBatches :+ iudRule :+ secondaryIndexRule
   }
 
-  def convertedBatch(): Seq[Batch] = {
-    optimizer.batches.map { batch =>
-      Batch(
-        batch.name,
-        batch.strategy match {
-          case optimizer.Once =>
-            Once
-          case _: optimizer.FixedPoint =>
-            fixedPoint
-        },
-        batch.rules: _*
-      )
-    }
-  }
 }