You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/12/07 19:28:13 UTC

carbondata git commit: [CARBONDATA-1866] refactored CarbonLateDecodeRule to split different rules

Repository: carbondata
Updated Branches:
  refs/heads/master 29dc30280 -> 6b7217a8d


[CARBONDATA-1866] refactored CarbonLateDecodeRule to split different rules

This closes #1623


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6b7217a8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6b7217a8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6b7217a8

Branch: refs/heads/master
Commit: 6b7217a8d47aa2606859b92e4b03a5597b6083c9
Parents: 29dc302
Author: rahulforallp <ra...@knoldus.in>
Authored: Wed Dec 6 15:22:44 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Dec 8 00:57:47 2017 +0530

----------------------------------------------------------------------
 .../spark/sql/optimizer/CarbonIUDRule.scala     |  53 ++++++++
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 133 ++++++-------------
 .../sql/optimizer/CarbonUDFTransformRule.scala  |  68 ++++++++++
 .../src/main/spark2.1/CarbonSessionState.scala  |  22 ++-
 .../src/main/spark2.2/CarbonSessionState.scala  |   6 +-
 5 files changed, 185 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
new file mode 100644
index 0000000..7300fe9
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.optimizer
+
+import org.apache.spark.sql.ProjectForUpdate
+import org.apache.spark.sql.catalyst.expressions.{NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.mutation.CarbonProjectForUpdateCommand
+
+/**
+ * Rule specific for IUD operations
+ */
+class CarbonIUDRule extends Rule[LogicalPlan] with PredicateHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+      processPlan(plan)
+  }
+
+  private def processPlan(plan: LogicalPlan): LogicalPlan = {
+    plan transform {
+      case ProjectForUpdate(table, cols, Seq(updatePlan)) =>
+        var isTransformed = false
+        val newPlan = updatePlan transform {
+          case Project(pList, child) if !isTransformed =>
+            val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
+              .splitAt(pList.size - cols.size)
+            val diff = cols.diff(dest.map(_.name.toLowerCase))
+            if (diff.nonEmpty) {
+              sys.error(s"Unknown column(s) ${ diff.mkString(",") } in table ${ table.tableName }")
+            }
+            isTransformed = true
+            Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ source, child)
+        }
+        CarbonProjectForUpdateCommand(
+          newPlan, table.tableIdentifier.database, table.tableIdentifier.table)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 2e39f5e..764891b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -46,35 +46,14 @@ import org.apache.carbondata.spark.CarbonAliasDecoderRelation
  */
 class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-  private var relations: Seq[CarbonDecoderRelation] = _
 
-  private def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = {
-    plan collect {
-      case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        CarbonDecoderRelation(l.attributeMap,
-          l.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
-    }
-  }
+  private var relations: Seq[CarbonDecoderRelation] = _
 
   def apply(plan: LogicalPlan): LogicalPlan = {
-    relations = collectCarbonRelation(plan)
-    if (relations.nonEmpty && !isOptimized(plan)) {
-      // In case scalar subquery skip the transformation and update the flag.
-      if (relations.exists(_.carbonRelation.isSubquery.nonEmpty)) {
-        relations.foreach{carbonDecoderRelation =>
-          if (carbonDecoderRelation.carbonRelation.isSubquery.nonEmpty) {
-            carbonDecoderRelation.carbonRelation.isSubquery.remove(0)
-          }
-        }
-        LOGGER.info("Skip CarbonOptimizer for scalar/predicate sub query")
-        return plan
-      }
-      LOGGER.info("Starting to optimize plan")
-      val iudPlan = processPlan(plan)
-      val udfTransformedPlan = pushDownUDFToJoinLeftRelation(iudPlan)
+    if (checkIfRuleNeedToBeApplied(plan, true)) {
       val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
       val queryStatistic = new QueryStatistic()
-      val result = transformCarbonPlan(udfTransformedPlan, relations)
+      val result = transformCarbonPlan(plan, relations)
       queryStatistic.addStatistics("Time taken for Carbon Optimizer to optimize: ",
         System.currentTimeMillis)
       recorder.recordStatistics(queryStatistic)
@@ -86,62 +65,37 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
     }
   }
 
-  private def pushDownUDFToJoinLeftRelation(plan: LogicalPlan): LogicalPlan = {
-    val output = plan.transform {
-      case proj@Project(cols, Join(
-      left, right, jointype: org.apache.spark.sql.catalyst.plans.JoinType, condition)) =>
-        var projectionToBeAdded: Seq[org.apache.spark.sql.catalyst.expressions.Alias] = Seq.empty
-        var udfExists = false
-        val newCols = cols.map {
-          case a@Alias(s: ScalaUDF, name)
-            if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
-               name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
-            udfExists = true
-            projectionToBeAdded :+= a
-            AttributeReference(name, StringType, nullable = true)().withExprId(a.exprId)
-          case other => other
-        }
-        if (udfExists) {
-          val newLeft = left match {
-            case Project(columns, logicalPlan) =>
-              Project(columns ++ projectionToBeAdded, logicalPlan)
-            case filter: Filter =>
-              Project(filter.output ++ projectionToBeAdded, filter)
-            case relation: LogicalRelation =>
-              Project(relation.output ++ projectionToBeAdded, relation)
-            case other => other
-          }
-          Project(newCols, Join(newLeft, right, jointype, condition))
-        } else {
-          proj
+  def checkIfRuleNeedToBeApplied(plan: LogicalPlan, removeSubQuery: Boolean = false): Boolean = {
+    relations = collectCarbonRelation(plan);
+    if (relations.nonEmpty && !isOptimized(plan)) {
+      // In case scalar subquery skip the transformation and update the flag.
+      if (relations.exists(_.carbonRelation.isSubquery.nonEmpty)) {
+        if (removeSubQuery) {
+          relations.foreach { carbonDecoderRelation =>
+            if (carbonDecoderRelation.carbonRelation.isSubquery.nonEmpty) {
+              carbonDecoderRelation.carbonRelation.isSubquery.remove(0)
+            }
+          }
         }
-      case other => other
+        LOGGER.info("skip CarbonOptimizer for scalar/predicate sub query")
+        return false
+      }
+      true
+    } else {
+      LOGGER.info("skip CarbonOptimizer")
+      false
     }
-    output
   }
 
-  private def processPlan(plan: LogicalPlan): LogicalPlan = {
-    plan transform {
-      case ProjectForUpdate(table, cols, Seq(updatePlan)) =>
-        var isTransformed = false
-        val newPlan = updatePlan transform {
-          case Project(pList, child) if !isTransformed =>
-            val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
-              .splitAt(pList.size - cols.size)
-            val diff = cols.diff(dest.map(_.name.toLowerCase))
-            if (diff.nonEmpty) {
-              sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}")
-            }
-            isTransformed = true
-            Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ source, child)
-        }
-        CarbonProjectForUpdateCommand(
-          newPlan, table.tableIdentifier.database, table.tableIdentifier.table)
+  private def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = {
+    plan collect {
+      case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        CarbonDecoderRelation(l.attributeMap,
+          l.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
     }
   }
 
-
-  def isOptimized(plan: LogicalPlan): Boolean = {
+  private def isOptimized(plan: LogicalPlan): Boolean = {
     plan find {
       case cd: CarbonDictionaryCatalystDecoder => true
       case other => false
@@ -150,7 +104,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
 
   case class ExtraNodeInfo(var hasCarbonRelation: Boolean)
 
-  def fillNodeInfo(
+  private def fillNodeInfo(
       plan: LogicalPlan,
       extraNodeInfos: java.util.HashMap[LogicalPlan, ExtraNodeInfo]): ExtraNodeInfo = {
     plan match {
@@ -573,6 +527,22 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
     updateProjection(updateTempDecoder(transFormedPlan, aliasMap, attrMap))
   }
 
+  private def isDictionaryEncoded(attribute: Attribute,
+      attributeMap: util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation],
+      aliasMap: CarbonAliasDecoderRelation): Boolean = {
+
+    val uattr = aliasMap.getOrElse(attribute, attribute)
+    val relation = Option(attributeMap.get(AttributeReferenceWrapper(uattr)))
+    if (relation.isDefined) {
+      relation.get.dictionaryMap.get(uattr.name) match {
+        case Some(true) => true
+        case _ => false
+      }
+    } else {
+      false
+    }
+  }
+
   private def updateTempDecoder(plan: LogicalPlan,
       aliasMapOriginal: CarbonAliasDecoderRelation,
       attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]):
@@ -818,21 +788,6 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
     }
   }
 
-  private def isDictionaryEncoded(attr: Attribute,
-      attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation],
-      aliasMap: CarbonAliasDecoderRelation): Boolean = {
-    val uAttr = aliasMap.getOrElse(attr, attr)
-    val relation = Option(attrMap.get(AttributeReferenceWrapper(uAttr)))
-    if (relation.isDefined) {
-      relation.get.dictionaryMap.get(uAttr.name) match {
-        case Some(true) => true
-        case _ => false
-      }
-    } else {
-      false
-    }
-  }
-
   def qualifierPresence(plan: LogicalPlan, attr: Attribute): Boolean = {
     var present = false
     plan collect {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonUDFTransformRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonUDFTransformRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonUDFTransformRule.scala
new file mode 100644
index 0000000..c080cd9
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonUDFTransformRule.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, PredicateHelper,
+ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.StringType
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+class CarbonUDFTransformRule extends Rule[LogicalPlan] with PredicateHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+      pushDownUDFToJoinLeftRelation(plan)
+  }
+
+  private def pushDownUDFToJoinLeftRelation(plan: LogicalPlan): LogicalPlan = {
+    val output = plan.transform {
+      case proj@Project(cols, Join(
+      left, right, jointype: org.apache.spark.sql.catalyst.plans.JoinType, condition)) =>
+        var projectionToBeAdded: Seq[org.apache.spark.sql.catalyst.expressions.Alias] = Seq.empty
+        var udfExists = false
+        val newCols = cols.map {
+          case a@Alias(s: ScalaUDF, name)
+            if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
+               name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
+            udfExists = true
+            projectionToBeAdded :+= a
+            AttributeReference(name, StringType, nullable = true)().withExprId(a.exprId)
+          case other => other
+        }
+        if (udfExists) {
+          val newLeft = left match {
+            case Project(columns, logicalPlan) =>
+              Project(columns ++ projectionToBeAdded, logicalPlan)
+            case filter: Filter =>
+              Project(filter.output ++ projectionToBeAdded, filter)
+            case relation: LogicalRelation =>
+              Project(relation.output ++ projectionToBeAdded, relation)
+            case other => other
+          }
+          Project(newCols, Join(newLeft, right, jointype, condition))
+        } else {
+          proj
+        }
+      case other => other
+    }
+    output
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index 7113e63..8900847 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -17,8 +17,6 @@
 package org.apache.spark.sql.hive
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
@@ -28,12 +26,14 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
+import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy}
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -144,13 +144,23 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
 
   override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
 
-  experimentalMethods.extraStrategies =
+  experimentalMethods.extraStrategies = extraStrategies
+
+  experimentalMethods.extraOptimizations = extraOptimizations
+
+  def extraStrategies: Seq[Strategy] = {
     Seq(
       new StreamingTableStrategy(sparkSession),
       new CarbonLateDecodeStrategy,
       new DDLStrategy(sparkSession)
     )
-  experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+  }
+
+  def extraOptimizations: Seq[Rule[LogicalPlan]] = {
+    Seq(new CarbonIUDRule,
+      new CarbonUDFTransformRule,
+      new CarbonLateDecodeRule)
+  }
 
   override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index e10feb1..2ba6d09 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStr
 import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.{SQLConf, SessionState}
-import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
 import org.apache.spark.sql.types.DecimalType
 
@@ -170,7 +170,9 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
         new CarbonLateDecodeStrategy,
         new DDLStrategy(sparkSession)
     )
-  experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
+    new CarbonUDFTransformRule,
+    new CarbonLateDecodeRule)
 
   /**
    * Internal catalog for managing table and database states.