You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/21 05:52:20 UTC
[carbondata] branch master updated: [CARBONDATA-3309] MV datamap
supports Spark 2.1
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 4d7c8ad [CARBONDATA-3309] MV datamap supports Spark 2.1
4d7c8ad is described below
commit 4d7c8ada98ed15511d0abff349b64522f047344b
Author: qiuchenjian <80...@qq.com>
AuthorDate: Sun Mar 17 20:04:48 2019 +0800
[CARBONDATA-3309] MV datamap supports Spark 2.1
[Problem]
MV datamap doesn't support Spark 2.1 version, so we need to support it
[Solution]
The following is the modification point and all MV test cases are passed on spark 2.1 version
The Class we cann’t access in Spark 2.1 version
(1). org.apache.spark.internal.Logging
(2). org.apache.spark.sql.internal.SQLConf
Solution:Create class extends above classed
The Class that Spark 2.1 version doesn’t have
(1). org.apache.spark.sql.catalyst.plans.logical.Subquery
(2). org.apache.spark.sql.catalyst.catalog.interface.HiveTableRelation
Solution: Use CatalogRelation instead and don’t use (in LogicalPlanSignatureGenerator) Mv the Subquery code to carbon project
The method that we can’t access in Spark 2.1 version
(1). sparkSession.sessionState.catalog.lookupRelation
Solution: Solution:Add this method of SparkSQLUtil
The changes of some class
(1). org.apache.spark.sql.catalyst.expressions.SortOrder
(2). org.apache.spark.sql.catalyst.expressions.Cast
(3). org.apache.spark.sql.catalyst.plans.Statistics
Solution: Adapt the new interface
The method that Spark 2.1 version doesn’t have
(1). normalizeExprId,canonicalized of org.apache.spark.sql.catalyst.plans.QueryPlan
(2). CASE_SENSITIVE of SQLConf
(3). STARSCHEMA_DETECTION of SQLConf
Solution:Don’t use normalize , canonicalize and the CASE_SENSITIVE, STARSCHEMA_DETECTION
Some logicplan optimization rules that Spark 2.1 version doesn’t have
(1). SimplifyCreateMapOps
(2). SimplifyCreateArrayOps
(3). SimplifyCreateStructOps
(4). RemoveRedundantProject
(5). RemoveRedundantAliases
(6). PullupCorrelatedPredicates
(7). ReplaceDeduplicateWithAggregate
(8). EliminateView
Solution: Delete or move the code to carbon project
Generate the instance in SparkSQLUtil to adapt Spark 2.1 version
Query SQL pass the MV check in Spark 2.1 version(CarbonSessionState)
This closes #3150
---
.../carbondata/mv/datamap/MVDataMapProvider.scala | 2 +-
.../apache/carbondata/mv/datamap/MVHelper.scala | 2 +-
.../apache/carbondata/mv/rewrite/MatchMaker.scala | 2 +-
.../mv/rewrite/SummaryDatasetCatalog.scala | 5 +-
.../carbondata/mv/rewrite/TestSQLSuite.scala | 4 +-
.../carbondata/mv/rewrite/Tpcds_1_4_Suite.scala | 4 +-
.../mv/expressions/modular/subquery.scala | 13 ++-
.../mv/plans/modular/AggregatePushDown.scala | 8 +-
.../carbondata/mv/plans/modular/Harmonizer.scala | 2 +-
.../carbondata/mv/plans/modular/ModularPlan.scala | 8 +-
.../mv/plans/modular/ModularRelation.scala | 22 +---
.../carbondata/mv/plans/modular/Modularizer.scala | 2 +-
.../mv/plans/util/BirdcageOptimizer.scala | 10 +-
.../mv/plans/util/Logical2ModularExtractions.scala | 19 +--
.../carbondata/mv/plans/util/SQLBuildDSL.scala | 5 +-
.../carbondata/mv/plans/util/SQLBuilder.scala | 9 --
.../carbondata/mv/plans/util/Signature.scala | 2 +-
.../carbondata/mv/testutil/Tpcds_1_4_Tables.scala | 4 +-
.../carbondata/mv/plans/ModularToSQLSuite.scala | 4 +-
.../carbondata/mv/plans/SignatureSuite.scala | 4 +-
.../spark/sql/catalyst/analysis/EmptyRule.scala | 26 +++++
.../org/apache/spark/sql/util/SparkSQLUtil.scala | 113 +++++++++++++++++-
.../apache/spark/util/CarbonReflectionUtils.scala | 7 ++
.../src/main/scala/org/apache/spark/Logging.scala | 22 ++++
.../main/scala/org/apache/spark/sql/SQLConf.scala | 23 ++++
.../apache/spark/sql/CarbonToSparkAdapater.scala | 8 +-
.../sql/catalyst/catalog/HiveTableRelation.scala | 56 +++++++++
.../sql/catalyst/optimizer/MigrateOptimizer.scala | 129 +++++++++++++++++++++
.../sql/catalyst/plans/logical/Subquery.scala | 28 +++++
.../apache/spark/sql/hive/CarbonSessionState.scala | 19 ++-
30 files changed, 481 insertions(+), 81 deletions(-)
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
index 7108bf8..5ffc46a 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -81,7 +81,7 @@ class MVDataMapProvider(
val identifier = dataMapSchema.getRelationIdentifier
val logicalPlan =
new FindDataSourceTable(sparkSession).apply(
- sparkSession.sessionState.catalog.lookupRelation(
+ SparkSQLUtil.sessionState(sparkSession).catalog.lookupRelation(
TableIdentifier(identifier.getTableName,
Some(identifier.getDatabaseName)))) match {
case s: SubqueryAlias => s.child
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 8baa924..9bed098 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -547,7 +547,7 @@ object MVHelper {
relation,
aliasMap,
keepAlias = false)
- SortOrder(expressions.head, s.direction, s.sameOrderExpressions)
+ SortOrder(expressions.head, s.direction)
}
// In case of limit it goes to other.
case other => other
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
index 2c5d8f4..493539b 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
@@ -17,7 +17,7 @@
package org.apache.carbondata.mv.rewrite
-import org.apache.spark.internal.Logging
+import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.trees.TreeNode
abstract class MatchPattern[MatchingPlan <: TreeNode[MatchingPlan]] extends Logging {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
index 026d6b7..ea0bfea 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.FindDataSourceTable
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.core.datamap.DataMapCatalog
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
@@ -34,6 +35,7 @@ import org.apache.carbondata.mv.plans.modular.{Flags, ModularPlan, ModularRelati
import org.apache.carbondata.mv.plans.util.Signature
import org.apache.carbondata.mv.session.MVSession
+
/** Holds a summary logical plan */
private[mv] case class SummaryDataset(signature: Option[Signature],
plan: LogicalPlan,
@@ -114,7 +116,8 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
mvSession.sessionState.optimizer.execute(planToRegister)).next().semiHarmonized
val signature = modularPlan.signature
val identifier = dataMapSchema.getRelationIdentifier
- val output = new FindDataSourceTable(sparkSession).apply(sparkSession.sessionState.catalog
+ val output = new FindDataSourceTable(sparkSession)
+ .apply(SparkSQLUtil.sessionState(sparkSession).catalog
.lookupRelation(TableIdentifier(identifier.getTableName, Some(identifier.getDatabaseName))))
.output
val relation = ModularRelation(identifier.getDatabaseName,
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala
index 25f07e4..95450b2 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala
@@ -20,15 +20,15 @@ package org.apache.carbondata.mv.rewrite
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.scalatest.BeforeAndAfter
-
import org.apache.carbondata.mv.testutil.ModularPlanTest
+import org.apache.spark.sql.util.SparkSQLUtil
class TestSQLSuite extends ModularPlanTest with BeforeAndAfter {
import org.apache.carbondata.mv.rewrite.matching.TestSQLBatch._
val spark = sqlContext
val testHive = sqlContext.sparkSession
- val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient()
+ val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient()
ignore("protypical mqo rewrite test") {
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala
index 882a43a..b30a131 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala
@@ -20,8 +20,8 @@ package org.apache.carbondata.mv.rewrite
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.scalatest.BeforeAndAfter
-
import org.apache.carbondata.mv.testutil.ModularPlanTest
+import org.apache.spark.sql.util.SparkSQLUtil
//import org.apache.spark.sql.catalyst.SQLBuilder
import java.io.{File, PrintWriter}
@@ -31,7 +31,7 @@ class Tpcds_1_4_Suite extends ModularPlanTest with BeforeAndAfter {
val spark = sqlContext
val testHive = sqlContext.sparkSession
- val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient()
+ val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient()
test("test using tpc-ds queries") {
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
index cfe341a..e41c9ca 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
@@ -17,9 +17,9 @@
package org.apache.carbondata.mv.expressions.modular
-import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, AttributeSet, Expression, ExprId, LeafExpression, NamedExpression, OuterReference, PlanExpression, Predicate, Unevaluable}
-import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.mv.plans.modular.ModularPlan
@@ -53,7 +53,8 @@ abstract class ModularSubquery(
def canonicalize(attrs: AttributeSeq): ModularSubquery = {
// Normalize the outer references in the subquery plan.
val normalizedPlan = plan.transformAllExpressions {
- case OuterReference(r) => OuterReference(QueryPlan.normalizeExprId(r, attrs))
+ case OuterReference(r) => OuterReference(SparkSQLUtil.
+ invokeQueryPlannormalizeExprId(r, attrs))
}
withNewPlan(normalizedPlan).canonicalized.asInstanceOf[ModularSubquery]
}
@@ -80,7 +81,7 @@ case class ScalarModularSubquery(
override lazy val canonicalized: Expression = {
ScalarModularSubquery(
- plan.canonicalized,
+ plan.canonicalizedDef,
children.map(_.canonicalized),
ExprId(0))
}
@@ -122,7 +123,7 @@ case class ModularListQuery(
override lazy val canonicalized: Expression = {
ModularListQuery(
- plan.canonicalized,
+ plan.canonicalizedDef,
children.map(_.canonicalized),
ExprId(0))
}
@@ -153,7 +154,7 @@ case class ModularExists(
override lazy val canonicalized: Expression = {
ModularExists(
- plan.canonicalized,
+ plan.canonicalizedDef,
children.map(_.canonicalized),
ExprId(0))
}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
index 77efaf7..a19066d 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
@@ -19,7 +19,8 @@ package org.apache.carbondata.mv.plans.modular
import scala.collection._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Cast, Divide, ExprId, Literal, NamedExpression}
+import org.apache.spark.sql.CarbonExpressions.MatchCast
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Divide, ExprId, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate._
trait AggregatePushDown { // self: ModularPlan =>
@@ -106,12 +107,11 @@ trait AggregatePushDown { // self: ModularPlan =>
} else {
Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
}
- case sum@AggregateExpression(Sum(Cast(expr, dataType, timeZoneId)), _, false, _)
- if expr.isInstanceOf[Attribute] =>
+ case sum@AggregateExpression(Sum(cast@MatchCast(expr, dataType)), _, false, _) =>
val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
.asInstanceOf[Attribute]
if (fact.outputSet.contains(tAttr)) {
- val sum1 = AggregateExpression(Sum(Cast(tAttr, dataType, timeZoneId)), sum.mode, false)
+ val sum1 = AggregateExpression(Sum(cast), sum.mode, false)
val alias = Alias(sum1, sum1.toString)()
val tSum = AggregateExpression(Sum(alias.toAttribute), sum.mode, false, sum.resultId)
val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
index ebe8c8c..cb2043e 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.mv.plans.modular
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.SQLConf
import org.apache.carbondata.mv.plans
import org.apache.carbondata.mv.plans._
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
index 6c82598..cdf0aa4 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
@@ -20,13 +20,13 @@ package org.apache.carbondata.mv.plans.modular
import scala.collection._
import scala.collection.mutable.{HashMap, MultiMap}
-import org.apache.spark.internal.Logging
+import org.apache.spark.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.trees.TreeNode
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.mv.plans._
@@ -45,6 +45,10 @@ abstract class ModularPlan
lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved
+ def canonicalizedDef: ModularPlan = {
+ canonicalized
+ }
+
def childrenResolved: Boolean = children.forall(_.resolved)
private var statsCache: Option[Statistics] = None
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
index 491d394..b7512d2 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.mv.plans.modular.Flags._
@@ -47,14 +47,7 @@ case class ModularRelation(databaseName: String,
override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = {
val plan = spark.table(s"${ databaseName }.${ tableName }").queryExecution.optimizedPlan
val stats = SparkSQLUtil.invokeStatsMethod(plan, conf)
- val output = outputList.map(_.toAttribute)
- val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
- table => AttributeMap(table.output.zip(output))
- }
- val rewrites = mapSeq(0)
- val attributeStats = AttributeMap(stats.attributeStats.iterator
- .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
- Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, stats.hints)
+ SparkSQLUtil.getStatisticsObj(outputList, plan, stats)
}
override def output: Seq[Attribute] = outputList.map(_.toAttribute)
@@ -155,10 +148,6 @@ case class HarmonizedRelation(source: ModularPlan) extends LeafNode {
val stats = SparkSQLUtil.invokeStatsMethod(plan, conf)
val output = source.asInstanceOf[GroupBy].child.children(0).asInstanceOf[ModularRelation]
.outputList.map(_.toAttribute)
- val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
- table => AttributeMap(table.output.zip(output))
- }
- val rewrites = mapSeq.head
val aliasMap = AttributeMap(
source.asInstanceOf[GroupBy].outputList.collect {
case a@Alias(ar: Attribute, _) => (ar, a.toAttribute)
@@ -167,12 +156,7 @@ case class HarmonizedRelation(source: ModularPlan) extends LeafNode {
case a@Alias(AggregateExpression(Last(ar: Attribute, _), _, _, _), _) =>
(ar, a.toAttribute)
})
- val aStatsIterator = stats.attributeStats.iterator.map { pair => (rewrites(pair._1), pair._2) }
- val attributeStats =
- AttributeMap(
- aStatsIterator.map(pair => (aliasMap.get(pair._1).getOrElse(pair._1), pair._2)).toSeq)
-
- Statistics(stats.sizeInBytes, None, attributeStats, stats.hints)
+ SparkSQLUtil.getStatisticsObj(output, plan, stats, Option(aliasMap))
}
override def output: Seq[Attribute] = source.output
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
index d255359..a0d16cb 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
@@ -17,7 +17,7 @@
package org.apache.carbondata.mv.plans.modular
-import org.apache.spark.internal.Logging
+import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{Exists, ListQuery, ScalarSubquery}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
index 42cf15c..9182a89 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
@@ -23,13 +23,13 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
import org.apache.spark.sql.catalyst.rules.{RuleExecutor, _}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.util.SparkSQLUtil
object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
val conf = new SQLConf()
- .copy(SQLConf.CASE_SENSITIVE -> true, SQLConf.STARSCHEMA_DETECTION -> true)
+
protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations)
def batches: Seq[Batch] = {
@@ -40,7 +40,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
Batch(
"Finish Analysis", Once,
EliminateSubqueryAliases,
- EliminateView,
+ SparkSQLUtil.getEliminateViewObj(),
ReplaceExpressions,
ComputeCurrentTime,
// GetCurrentDatabase(sessionCatalog),
@@ -59,7 +59,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
CombineUnions) ::
Batch(
"Pullup Correlated Expressions", Once,
- PullupCorrelatedPredicates) ::
+ SparkSQLUtil.getPullupCorrelatedPredicatesObj()) ::
Batch(
"Subquery", Once,
OptimizeSubqueries) ::
@@ -107,7 +107,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
SimplifyCaseConversionExpressions,
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
- RemoveRedundantAliases,
+ SparkSQLUtil.getRemoveRedundantAliasesObj(),
RemoveRedundantProject,
SimplifyCreateStructOps,
SimplifyCreateArrayOps,
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
index de65e37..0652575 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
@@ -17,9 +17,8 @@
package org.apache.carbondata.mv.plans.util
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference,
- AttributeSet, Expression, NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression, PredicateHelper}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -27,6 +26,8 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.mv.plans.modular.Flags._
import org.apache.carbondata.mv.plans.modular.JoinEdge
+
+
/**
* SelectModule is extracted from logical plan of SPJG query. All join conditions
* filter, and project operators in a single Aggregate-less subtree of logical plan
@@ -335,13 +336,13 @@ object ExtractTableModule extends PredicateHelper {
def unapply(plan: LogicalPlan): Option[ReturnType] = {
plan match {
// uncomment for cloudera1 version
-// case m: CatalogRelation =>
-// Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
-// Seq.empty)
-// uncomment for apache version
+ // case m: CatalogRelation =>
+ // Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
+ // Seq.empty)
+ // uncomment for apache version
case m: HiveTableRelation =>
- Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
- Seq.empty)
+ Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
+ Seq.empty)
case l: LogicalRelation =>
val tableIdentifier = l.catalogTable.map(_.identifier)
val database = tableIdentifier.map(_.database).flatten.getOrElse(null)
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
index 307fff0..d2e4375 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
@@ -17,6 +17,7 @@
package org.apache.carbondata.mv.plans.util
+import org.apache.spark.sql.CarbonExpressions.MatchCastExpression
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, BitwiseAnd, Cast, Expression, Grouping, GroupingID, Literal, NamedExpression, ShiftRight}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.types.{ByteType, IntegerType}
@@ -398,10 +399,10 @@ trait SQLBuildDSL {
// it back.
case ar: AttributeReference if ar == gid => GroupingID(Nil)
case ar: AttributeReference if groupByAttrMap.contains(ar) => groupByAttrMap(ar)
- case a@Cast(
+ case a@MatchCastExpression(
BitwiseAnd(
ShiftRight(ar: AttributeReference, Literal(value: Any, IntegerType)),
- Literal(1, IntegerType)), ByteType, None) if ar == gid =>
+ Literal(1, IntegerType)), ByteType) if ar == gid =>
// for converting an expression to its original SQL format grouping(col)
val idx = groupByExprs.length - 1 - value.asInstanceOf[Int]
groupByExprs.lift(idx).map(Grouping).getOrElse(a)
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
index b6e62eb..815fb58 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
@@ -220,15 +220,6 @@ class SQLBuilder private(
}
}
}
-
- object RemoveCasts extends Rule[ModularPlan] {
- def apply(tree: ModularPlan): ModularPlan = {
- tree transformAllExpressions {
- case Cast(e, dataType, _) => e
- }
- }
- }
-
}
object SQLBuilder {
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala
index c46124b..1f8d7b4 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala
@@ -17,7 +17,7 @@
package org.apache.carbondata.mv.plans.util
-import org.apache.spark.internal.Logging
+import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.trees.TreeNode
case class Signature(groupby: Boolean = true, datasets: Set[String] = Set.empty)
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala
index 01ca448..103333d 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala
@@ -842,14 +842,14 @@ object Tpcds_1_4_Tables {
|STORED AS parquet
""".stripMargin.trim,
s"""
- | CREATE TABLE fact_table1 (empname String, designation String, doj Timestamp,
+ | CREATE TABLE IF NOT EXISTS fact_table1 (empname String, designation String, doj Timestamp,
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
| projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
| utilization int,salary int)
|STORED AS parquet
""".stripMargin.trim,
s"""
- | CREATE TABLE fact_table2 (empname String, designation String, doj Timestamp,
+ | CREATE TABLE IF NOT EXISTS fact_table2 (empname String, designation String, doj Timestamp,
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
| projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
| utilization int,salary int)
diff --git a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ModularToSQLSuite.scala b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ModularToSQLSuite.scala
index c74491c..dad8f8a 100644
--- a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ModularToSQLSuite.scala
+++ b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ModularToSQLSuite.scala
@@ -19,9 +19,9 @@ package org.apache.carbondata.mv.plans
import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.scalatest.BeforeAndAfter
-
import org.apache.carbondata.mv.dsl.Plans._
import org.apache.carbondata.mv.testutil.ModularPlanTest
+import org.apache.spark.sql.util.SparkSQLUtil
class ModularToSQLSuite extends ModularPlanTest with BeforeAndAfter {
@@ -29,7 +29,7 @@ class ModularToSQLSuite extends ModularPlanTest with BeforeAndAfter {
val spark = sqlContext
val testHive = sqlContext.sparkSession
- val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient()
+ val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient()
ignore("convert modular plans to sqls") {
diff --git a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/SignatureSuite.scala b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/SignatureSuite.scala
index 631eca2..5d4a05f 100644
--- a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/SignatureSuite.scala
+++ b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/SignatureSuite.scala
@@ -20,17 +20,17 @@ package org.apache.carbondata.mv.plans
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.scalatest.BeforeAndAfterAll
-
import org.apache.carbondata.mv.dsl.Plans._
import org.apache.carbondata.mv.plans.modular.ModularPlanSignatureGenerator
import org.apache.carbondata.mv.testutil.ModularPlanTest
+import org.apache.spark.sql.util.SparkSQLUtil
class SignatureSuite extends ModularPlanTest with BeforeAndAfterAll {
import org.apache.carbondata.mv.testutil.TestSQLBatch._
val spark = sqlContext
val testHive = sqlContext.sparkSession
- val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient()
+ val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient()
ignore("test signature computing") {
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/EmptyRule.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/EmptyRule.scala
new file mode 100644
index 0000000..4c77b29
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/EmptyRule.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+object EmptyRule extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan
+ }
+}
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 9ffe6e1..7903ac7 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -23,10 +23,14 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.analysis.EmptyRule
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, Cast, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.{SessionState, SQLConf}
-import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil}
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil, Utils}
object SparkSQLUtil {
def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
@@ -51,6 +55,87 @@ object SparkSQLUtil {
}
}
+ def invokeQueryPlannormalizeExprId(r: NamedExpression, input: AttributeSeq)
+ : NamedExpression = {
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.QueryPlan")
+ clazz.getDeclaredMethod("normalizeExprId", classOf[Any], classOf[AttributeSeq]).
+ invoke(null, r, input).asInstanceOf[NamedExpression]
+ } else {
+ r
+ }
+ }
+
+ def getStatisticsObj(outputList: Seq[NamedExpression],
+ plan: LogicalPlan, stats: Statistics,
+ aliasMap: Option[AttributeMap[Attribute]] = None)
+ : Statistics = {
+ val className = "org.apache.spark.sql.catalyst.plans.logical.Statistics"
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val output = outputList.map(_.toAttribute)
+ val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+ table => AttributeMap(table.output.zip(output))
+ }
+ val rewrites = mapSeq.head
+ val attributes : AttributeMap[ColumnStat] = CarbonReflectionUtils.
+ getField("attributeStats", stats).asInstanceOf[AttributeMap[ColumnStat]]
+ var attributeStats = AttributeMap(attributes.iterator
+ .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+ if (aliasMap.isDefined) {
+ attributeStats = AttributeMap(
+ attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
+ }
+ val hints = CarbonReflectionUtils.getField("hints", stats).asInstanceOf[Object]
+ CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
+ stats.rowCount, attributeStats, hints).asInstanceOf[Statistics]
+ } else {
+ val output = outputList.map(_.name)
+ val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+ table => table.output.map(_.name).zip(output).toMap
+ }
+ val rewrites = mapSeq.head
+ val colStats = CarbonReflectionUtils.getField("colStats", stats)
+ .asInstanceOf[Map[String, ColumnStat]]
+ var attributeStats = colStats.iterator
+ .map { pair => (rewrites(pair._1), pair._2) }.toMap
+ if (aliasMap.isDefined) {
+ val aliasMapName = aliasMap.get.map(x => (x._1.name, x._2.name))
+ attributeStats =
+ attributeStats.map(pair => (aliasMapName.getOrElse(pair._1, pair._1)
+ , pair._2))
+ }
+ CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
+ stats.rowCount, attributeStats).asInstanceOf[Statistics]
+ }
+ }
+
+ def getEliminateViewObj(): Rule[LogicalPlan] = {
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val className = "org.apache.spark.sql.catalyst.analysis.EliminateView"
+ CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
+ } else {
+ EmptyRule
+ }
+ }
+
+ def getPullupCorrelatedPredicatesObj(): Rule[LogicalPlan] = {
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.PullupCorrelatedPredicates"
+ CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
+ } else {
+ EmptyRule
+ }
+ }
+
+ def getRemoveRedundantAliasesObj(): Rule[LogicalPlan] = {
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAliases"
+ CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
+ } else {
+ EmptyRule
+ }
+ }
+
def getReorderJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
if (SparkUtil.isSparkVersionEqualTo("2.2")) {
val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin";
@@ -59,7 +144,12 @@ object SparkSQLUtil {
val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
.asInstanceOf[Rule[LogicalPlan]]
- } else {
+ } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
+ CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+ .asInstanceOf[Rule[LogicalPlan]]
+ }
+ else {
throw new UnsupportedOperationException("Spark version not supported")
}
}
@@ -72,7 +162,12 @@ object SparkSQLUtil {
val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
.asInstanceOf[Rule[LogicalPlan]]
- } else {
+ } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
+ CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+ .asInstanceOf[Rule[LogicalPlan]]
+ }
+ else {
throw new UnsupportedOperationException("Spark version not supported")
}
}
@@ -85,6 +180,10 @@ object SparkSQLUtil {
val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
.asInstanceOf[Rule[LogicalPlan]]
+ } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
+ CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+ .asInstanceOf[Rule[LogicalPlan]]
} else {
throw new UnsupportedOperationException("Spark version not supported")
}
@@ -98,7 +197,11 @@ object SparkSQLUtil {
val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$";
CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
.asInstanceOf[Rule[LogicalPlan]]
- } else {
+ } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
+ CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
+ }
+ else {
throw new UnsupportedOperationException("Spark version not supported")
}
}
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index ee635e0..bdacfcd 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -320,6 +320,13 @@ object CarbonReflectionUtils {
._1.asInstanceOf[RunnableCommand]
}
+ def createSingleObject(className: String): Any = {
+ val classMirror = universe.runtimeMirror(getClass.getClassLoader)
+ val classTest = classMirror.staticModule(className)
+ val methods = classMirror.reflectModule(classTest)
+ methods.instance
+ }
+
def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
val clazz = Utils.classForName(className)
val ctor = clazz.getConstructors.head
diff --git a/integration/spark2/src/main/scala/org/apache/spark/Logging.scala b/integration/spark2/src/main/scala/org/apache/spark/Logging.scala
new file mode 100644
index 0000000..62d6862
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/Logging.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+trait Logging extends org.apache.spark.internal.Logging{
+
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SQLConf.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SQLConf.scala
new file mode 100644
index 0000000..6f60d0f
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+class SQLConf extends org.apache.spark.sql.internal.SQLConf {
+ val CASE_SENSITIVE = true
+
+ val STARSCHEMA_DETECTION = true
+}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
index 69541eb..79a6240 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -25,7 +25,9 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
-import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
+import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.types.{DataType, Metadata}
@@ -81,4 +83,8 @@ object CarbonToSparkAdapter {
tablePath: String): CatalogStorageFormat = {
storageFormat.copy(properties = map, locationUri = Some(tablePath))
}
+
+ def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
+ Seq(OptimizeCodegen(conf))
+ }
}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
new file mode 100644
index 0000000..eb3e88d
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import com.google.common.base.Objects
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * A `LogicalPlan` that represents a hive table.
+ *
+ * TODO: remove this after we completely make hive as a data source.
+ */
+case class HiveTableRelation(
+ tableMeta: CatalogTable,
+ dataCols: Seq[AttributeReference],
+ partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
+ assert(tableMeta.identifier.database.isDefined)
+ assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
+ assert(tableMeta.schema.sameType(dataCols.toStructType))
+
+ // The partition column should always appear after data columns.
+ override def output: Seq[AttributeReference] = dataCols ++ partitionCols
+
+ def isPartitioned: Boolean = partitionCols.nonEmpty
+
+ override def equals(relation: Any): Boolean = relation match {
+ case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ Objects.hashCode(tableMeta.identifier, output)
+ }
+
+ override def newInstance(): HiveTableRelation = copy(
+ dataCols = dataCols.map(_.newInstance()),
+ partitionCols = partitionCols.map(_.newInstance()))
+}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
new file mode 100644
index 0000000..9a88255
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseKeyWhen, CreateArray, CreateMap, CreateNamedStructLike, GetArrayItem, GetArrayStructFields, GetMapValue, GetStructField, IntegerLiteral, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.First
+import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable, MapObjects}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project, UnaryNode}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+class MigrateOptimizer {
+
+}
+
+/**
+ * Replaces logical [[Deduplicate]] operator with an [[Aggregate]] operator.
+ */
+object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case Deduplicate(keys, child, streaming) if !streaming =>
+ val keyExprIds = keys.map(_.exprId)
+ val aggCols = child.output.map { attr =>
+ if (keyExprIds.contains(attr.exprId)) {
+ attr
+ } else {
+ Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
+ }
+ }
+ Aggregate(keys, aggCols, child)
+ }
+}
+
+/** A logical plan for `dropDuplicates`. */
+case class Deduplicate(
+ keys: Seq[Attribute],
+ child: LogicalPlan,
+ streaming: Boolean) extends UnaryNode {
+
+ override def output: Seq[Attribute] = child.output
+}
+
+/**
+ * Remove projections from the query plan that do not make any modifications.
+ */
+object RemoveRedundantProject extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case p @ Project(_, child) if p.output == child.output => child
+ }
+}
+
+/**
+ * push down operations into [[CreateNamedStructLike]].
+ */
+object SimplifyCreateStructOps extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.transformExpressionsUp {
+ // push down field extraction
+ case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) =>
+ createNamedStructLike.valExprs(ordinal)
+ }
+ }
+}
+
+/**
+ * push down operations into [[CreateArray]].
+ */
+object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.transformExpressionsUp {
+ // push down field selection (array of structs)
+ case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) =>
+ // instead f selecting the field on the entire array,
+ // select it from each member of the array.
+ // pushing down the operation this way open other optimizations opportunities
+ // (i.e. struct(...,x,...).x)
+ CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name))))
+ // push down item selection.
+ case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
+ // instead of creating the array and then selecting one row,
+ // remove array creation altgether.
+ if (idx >= 0 && idx < elems.size) {
+ // valid index
+ elems(idx)
+ } else {
+ // out of bounds, mimic the runtime behavior and return null
+ Literal(null, ga.dataType)
+ }
+ }
+ }
+}
+
+/**
+ * push down operations into [[CreateMap]].
+ */
+object SimplifyCreateMapOps extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.transformExpressionsUp {
+ case GetMapValue(CreateMap(elems), key) => CaseKeyWhen(key, elems)
+ }
+ }
+}
+
+
+/**
+ * Removes MapObjects when the following conditions are satisfied
+ * 1. Mapobject(... lambdavariable(..., false) ...), which means types for input and output
+ * are primitive types with non-nullable
+ * 2. no custom collection class specified representation of data item.
+ */
+object EliminateMapObjects extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+ case MapObjects(_, _, _, LambdaVariable(_, _, _), inputData) => inputData
+ }
+}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
new file mode 100644
index 0000000..4abf189
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+/**
+ * This node is inserted at the top of a subquery when it is optimized. This makes sure we can
+ * recognize a subquery as such, and it allows us to write subquery aware transformations.
+ */
+case class Subquery(child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 47feae0..5caa4dd 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -36,13 +36,13 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser, CarbonSparkSqlParserUtil}
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy}
-
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.util.CarbonScalaUtil
+import org.apache.spark.util.CarbonReflectionUtils
/**
* This class will have carbon catalog and refresh the relation from cache if the carbontable in
@@ -301,10 +301,25 @@ class CarbonAnalyzer(catalog: SessionCatalog,
conf: CatalystConf,
sparkSession: SparkSession,
analyzer: Analyzer) extends Analyzer(catalog, conf) {
+
+ val mvPlan = try {
+ CarbonReflectionUtils.createObject(
+ "org.apache.carbondata.mv.datamap.MVAnalyzerRule",
+ sparkSession)._1.asInstanceOf[Rule[LogicalPlan]]
+ } catch {
+ case e: Exception =>
+ null
+ }
+
override def execute(plan: LogicalPlan): LogicalPlan = {
var logicalPlan = analyzer.execute(plan)
logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
+ if (mvPlan != null) {
+ mvPlan.apply(logicalPlan)
+ } else {
+ logicalPlan
+ }
}
}