You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/12/07 19:28:13 UTC
carbondata git commit: [CARBONDATA-1866] refactored
CarbonLateDecodeRule to split different rules
Repository: carbondata
Updated Branches:
refs/heads/master 29dc30280 -> 6b7217a8d
[CARBONDATA-1866] refactored CarbonLateDecodeRule to split different rules
This closes #1623
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6b7217a8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6b7217a8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6b7217a8
Branch: refs/heads/master
Commit: 6b7217a8d47aa2606859b92e4b03a5597b6083c9
Parents: 29dc302
Author: rahulforallp <ra...@knoldus.in>
Authored: Wed Dec 6 15:22:44 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Dec 8 00:57:47 2017 +0530
----------------------------------------------------------------------
.../spark/sql/optimizer/CarbonIUDRule.scala | 53 ++++++++
.../sql/optimizer/CarbonLateDecodeRule.scala | 133 ++++++-------------
.../sql/optimizer/CarbonUDFTransformRule.scala | 68 ++++++++++
.../src/main/spark2.1/CarbonSessionState.scala | 22 ++-
.../src/main/spark2.2/CarbonSessionState.scala | 6 +-
5 files changed, 185 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
new file mode 100644
index 0000000..7300fe9
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.optimizer
+
+import org.apache.spark.sql.ProjectForUpdate
+import org.apache.spark.sql.catalyst.expressions.{NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.mutation.CarbonProjectForUpdateCommand
+
+/**
+ * Rule specific for IUD operations
+ */
+class CarbonIUDRule extends Rule[LogicalPlan] with PredicateHelper {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ processPlan(plan)
+ }
+
+ private def processPlan(plan: LogicalPlan): LogicalPlan = {
+ plan transform {
+ case ProjectForUpdate(table, cols, Seq(updatePlan)) =>
+ var isTransformed = false
+ val newPlan = updatePlan transform {
+ case Project(pList, child) if !isTransformed =>
+ val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
+ .splitAt(pList.size - cols.size)
+ val diff = cols.diff(dest.map(_.name.toLowerCase))
+ if (diff.nonEmpty) {
+ sys.error(s"Unknown column(s) ${ diff.mkString(",") } in table ${ table.tableName }")
+ }
+ isTransformed = true
+ Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ source, child)
+ }
+ CarbonProjectForUpdateCommand(
+ newPlan, table.tableIdentifier.database, table.tableIdentifier.table)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 2e39f5e..764891b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -46,35 +46,14 @@ import org.apache.carbondata.spark.CarbonAliasDecoderRelation
*/
class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- private var relations: Seq[CarbonDecoderRelation] = _
- private def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = {
- plan collect {
- case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- CarbonDecoderRelation(l.attributeMap,
- l.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
- }
- }
+ private var relations: Seq[CarbonDecoderRelation] = _
def apply(plan: LogicalPlan): LogicalPlan = {
- relations = collectCarbonRelation(plan)
- if (relations.nonEmpty && !isOptimized(plan)) {
- // In case scalar subquery skip the transformation and update the flag.
- if (relations.exists(_.carbonRelation.isSubquery.nonEmpty)) {
- relations.foreach{carbonDecoderRelation =>
- if (carbonDecoderRelation.carbonRelation.isSubquery.nonEmpty) {
- carbonDecoderRelation.carbonRelation.isSubquery.remove(0)
- }
- }
- LOGGER.info("Skip CarbonOptimizer for scalar/predicate sub query")
- return plan
- }
- LOGGER.info("Starting to optimize plan")
- val iudPlan = processPlan(plan)
- val udfTransformedPlan = pushDownUDFToJoinLeftRelation(iudPlan)
+ if (checkIfRuleNeedToBeApplied(plan, true)) {
val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
val queryStatistic = new QueryStatistic()
- val result = transformCarbonPlan(udfTransformedPlan, relations)
+ val result = transformCarbonPlan(plan, relations)
queryStatistic.addStatistics("Time taken for Carbon Optimizer to optimize: ",
System.currentTimeMillis)
recorder.recordStatistics(queryStatistic)
@@ -86,62 +65,37 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
}
}
- private def pushDownUDFToJoinLeftRelation(plan: LogicalPlan): LogicalPlan = {
- val output = plan.transform {
- case proj@Project(cols, Join(
- left, right, jointype: org.apache.spark.sql.catalyst.plans.JoinType, condition)) =>
- var projectionToBeAdded: Seq[org.apache.spark.sql.catalyst.expressions.Alias] = Seq.empty
- var udfExists = false
- val newCols = cols.map {
- case a@Alias(s: ScalaUDF, name)
- if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
- name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
- udfExists = true
- projectionToBeAdded :+= a
- AttributeReference(name, StringType, nullable = true)().withExprId(a.exprId)
- case other => other
- }
- if (udfExists) {
- val newLeft = left match {
- case Project(columns, logicalPlan) =>
- Project(columns ++ projectionToBeAdded, logicalPlan)
- case filter: Filter =>
- Project(filter.output ++ projectionToBeAdded, filter)
- case relation: LogicalRelation =>
- Project(relation.output ++ projectionToBeAdded, relation)
- case other => other
- }
- Project(newCols, Join(newLeft, right, jointype, condition))
- } else {
- proj
+ def checkIfRuleNeedToBeApplied(plan: LogicalPlan, removeSubQuery: Boolean = false): Boolean = {
+ relations = collectCarbonRelation(plan);
+ if (relations.nonEmpty && !isOptimized(plan)) {
+ // In case scalar subquery skip the transformation and update the flag.
+ if (relations.exists(_.carbonRelation.isSubquery.nonEmpty)) {
+ if (removeSubQuery) {
+ relations.foreach { carbonDecoderRelation =>
+ if (carbonDecoderRelation.carbonRelation.isSubquery.nonEmpty) {
+ carbonDecoderRelation.carbonRelation.isSubquery.remove(0)
+ }
+ }
}
- case other => other
+ LOGGER.info("skip CarbonOptimizer for scalar/predicate sub query")
+ return false
+ }
+ true
+ } else {
+ LOGGER.info("skip CarbonOptimizer")
+ false
}
- output
}
- private def processPlan(plan: LogicalPlan): LogicalPlan = {
- plan transform {
- case ProjectForUpdate(table, cols, Seq(updatePlan)) =>
- var isTransformed = false
- val newPlan = updatePlan transform {
- case Project(pList, child) if !isTransformed =>
- val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
- .splitAt(pList.size - cols.size)
- val diff = cols.diff(dest.map(_.name.toLowerCase))
- if (diff.nonEmpty) {
- sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}")
- }
- isTransformed = true
- Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ source, child)
- }
- CarbonProjectForUpdateCommand(
- newPlan, table.tableIdentifier.database, table.tableIdentifier.table)
+ private def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = {
+ plan collect {
+ case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ CarbonDecoderRelation(l.attributeMap,
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
}
}
-
- def isOptimized(plan: LogicalPlan): Boolean = {
+ private def isOptimized(plan: LogicalPlan): Boolean = {
plan find {
case cd: CarbonDictionaryCatalystDecoder => true
case other => false
@@ -150,7 +104,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
case class ExtraNodeInfo(var hasCarbonRelation: Boolean)
- def fillNodeInfo(
+ private def fillNodeInfo(
plan: LogicalPlan,
extraNodeInfos: java.util.HashMap[LogicalPlan, ExtraNodeInfo]): ExtraNodeInfo = {
plan match {
@@ -573,6 +527,22 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
updateProjection(updateTempDecoder(transFormedPlan, aliasMap, attrMap))
}
+ private def isDictionaryEncoded(attribute: Attribute,
+ attributeMap: util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation],
+ aliasMap: CarbonAliasDecoderRelation): Boolean = {
+
+ val uattr = aliasMap.getOrElse(attribute, attribute)
+ val relation = Option(attributeMap.get(AttributeReferenceWrapper(uattr)))
+ if (relation.isDefined) {
+ relation.get.dictionaryMap.get(uattr.name) match {
+ case Some(true) => true
+ case _ => false
+ }
+ } else {
+ false
+ }
+ }
+
private def updateTempDecoder(plan: LogicalPlan,
aliasMapOriginal: CarbonAliasDecoderRelation,
attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]):
@@ -818,21 +788,6 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
}
}
- private def isDictionaryEncoded(attr: Attribute,
- attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation],
- aliasMap: CarbonAliasDecoderRelation): Boolean = {
- val uAttr = aliasMap.getOrElse(attr, attr)
- val relation = Option(attrMap.get(AttributeReferenceWrapper(uAttr)))
- if (relation.isDefined) {
- relation.get.dictionaryMap.get(uAttr.name) match {
- case Some(true) => true
- case _ => false
- }
- } else {
- false
- }
- }
-
def qualifierPresence(plan: LogicalPlan, attr: Attribute): Boolean = {
var present = false
plan collect {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonUDFTransformRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonUDFTransformRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonUDFTransformRule.scala
new file mode 100644
index 0000000..c080cd9
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonUDFTransformRule.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, PredicateHelper,
+ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.StringType
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+class CarbonUDFTransformRule extends Rule[LogicalPlan] with PredicateHelper {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ pushDownUDFToJoinLeftRelation(plan)
+ }
+
+ private def pushDownUDFToJoinLeftRelation(plan: LogicalPlan): LogicalPlan = {
+ val output = plan.transform {
+ case proj@Project(cols, Join(
+ left, right, jointype: org.apache.spark.sql.catalyst.plans.JoinType, condition)) =>
+ var projectionToBeAdded: Seq[org.apache.spark.sql.catalyst.expressions.Alias] = Seq.empty
+ var udfExists = false
+ val newCols = cols.map {
+ case a@Alias(s: ScalaUDF, name)
+ if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
+ name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
+ udfExists = true
+ projectionToBeAdded :+= a
+ AttributeReference(name, StringType, nullable = true)().withExprId(a.exprId)
+ case other => other
+ }
+ if (udfExists) {
+ val newLeft = left match {
+ case Project(columns, logicalPlan) =>
+ Project(columns ++ projectionToBeAdded, logicalPlan)
+ case filter: Filter =>
+ Project(filter.output ++ projectionToBeAdded, filter)
+ case relation: LogicalRelation =>
+ Project(relation.output ++ projectionToBeAdded, relation)
+ case other => other
+ }
+ Project(newCols, Join(newLeft, right, jointype, condition))
+ } else {
+ proj
+ }
+ case other => other
+ }
+ output
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index 7113e63..8900847 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.hive
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
@@ -28,12 +26,14 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
+import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy}
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -144,13 +144,23 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
- experimentalMethods.extraStrategies =
+ experimentalMethods.extraStrategies = extraStrategies
+
+ experimentalMethods.extraOptimizations = extraOptimizations
+
+ def extraStrategies: Seq[Strategy] = {
Seq(
new StreamingTableStrategy(sparkSession),
new CarbonLateDecodeStrategy,
new DDLStrategy(sparkSession)
)
- experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+ }
+
+ def extraOptimizations: Seq[Rule[LogicalPlan]] = {
+ Seq(new CarbonIUDRule,
+ new CarbonUDFTransformRule,
+ new CarbonLateDecodeRule)
+ }
override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index e10feb1..2ba6d09 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStr
import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.{SQLConf, SessionState}
-import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
import org.apache.spark.sql.types.DecimalType
@@ -170,7 +170,9 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
new CarbonLateDecodeStrategy,
new DDLStrategy(sparkSession)
)
- experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+ experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
+ new CarbonUDFTransformRule,
+ new CarbonLateDecodeRule)
/**
* Internal catalog for managing table and database states.