You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/21 05:52:20 UTC

[carbondata] branch master updated: [CARBONDATA-3309] MV datamap supports Spark 2.1

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d7c8ad  [CARBONDATA-3309] MV datamap supports Spark 2.1
4d7c8ad is described below

commit 4d7c8ada98ed15511d0abff349b64522f047344b
Author: qiuchenjian <80...@qq.com>
AuthorDate: Sun Mar 17 20:04:48 2019 +0800

    [CARBONDATA-3309] MV datamap supports Spark 2.1
    
    [Problem]
    MV datamap doesn't support Spark 2.1 version, so we need to support it
    
    [Solution]
    The following is the modification point and all MV test cases are passed on spark 2.1 version
    
    The Class we cann’t access in Spark 2.1 version
    (1). org.apache.spark.internal.Logging
    (2). org.apache.spark.sql.internal.SQLConf
    Solution:Create class extends above classed
    
    The Class that Spark 2.1 version doesn’t have
    (1). org.apache.spark.sql.catalyst.plans.logical.Subquery
    (2). org.apache.spark.sql.catalyst.catalog.interface.HiveTableRelation
    Solution: Use CatalogRelation instead and don’t use (in LogicalPlanSignatureGenerator) Mv the Subquery code to carbon project
    
    The method that we can’t access in Spark 2.1 version
    (1). sparkSession.sessionState.catalog.lookupRelation
    Solution: Solution:Add this method of SparkSQLUtil
    
    The changes of some class
    (1). org.apache.spark.sql.catalyst.expressions.SortOrder
    (2). org.apache.spark.sql.catalyst.expressions.Cast
    (3). org.apache.spark.sql.catalyst.plans.Statistics
    Solution: Adapt the new interface
    
    The method that Spark 2.1 version doesn’t have
    (1). normalizeExprId,canonicalized of org.apache.spark.sql.catalyst.plans.QueryPlan
    (2). CASE_SENSITIVE of SQLConf
    (3). STARSCHEMA_DETECTION of SQLConf
    Solution:Don’t use normalize , canonicalize and the CASE_SENSITIVE, STARSCHEMA_DETECTION
    
    Some logicplan optimization rules that Spark 2.1 version doesn’t have
    (1). SimplifyCreateMapOps
    (2). SimplifyCreateArrayOps
    (3). SimplifyCreateStructOps
    (4). RemoveRedundantProject
    (5). RemoveRedundantAliases
    (6). PullupCorrelatedPredicates
    (7). ReplaceDeduplicateWithAggregate
    (8). EliminateView
    Solution: Delete or move the code to carbon project
    
    Generate the instance in SparkSQLUtil to adapt Spark 2.1 version
    
    Query SQL pass the MV check in Spark 2.1 version(CarbonSessionState)
    
    This closes #3150
---
 .../carbondata/mv/datamap/MVDataMapProvider.scala  |   2 +-
 .../apache/carbondata/mv/datamap/MVHelper.scala    |   2 +-
 .../apache/carbondata/mv/rewrite/MatchMaker.scala  |   2 +-
 .../mv/rewrite/SummaryDatasetCatalog.scala         |   5 +-
 .../carbondata/mv/rewrite/TestSQLSuite.scala       |   4 +-
 .../carbondata/mv/rewrite/Tpcds_1_4_Suite.scala    |   4 +-
 .../mv/expressions/modular/subquery.scala          |  13 ++-
 .../mv/plans/modular/AggregatePushDown.scala       |   8 +-
 .../carbondata/mv/plans/modular/Harmonizer.scala   |   2 +-
 .../carbondata/mv/plans/modular/ModularPlan.scala  |   8 +-
 .../mv/plans/modular/ModularRelation.scala         |  22 +---
 .../carbondata/mv/plans/modular/Modularizer.scala  |   2 +-
 .../mv/plans/util/BirdcageOptimizer.scala          |  10 +-
 .../mv/plans/util/Logical2ModularExtractions.scala |  19 +--
 .../carbondata/mv/plans/util/SQLBuildDSL.scala     |   5 +-
 .../carbondata/mv/plans/util/SQLBuilder.scala      |   9 --
 .../carbondata/mv/plans/util/Signature.scala       |   2 +-
 .../carbondata/mv/testutil/Tpcds_1_4_Tables.scala  |   4 +-
 .../carbondata/mv/plans/ModularToSQLSuite.scala    |   4 +-
 .../carbondata/mv/plans/SignatureSuite.scala       |   4 +-
 .../spark/sql/catalyst/analysis/EmptyRule.scala    |  26 +++++
 .../org/apache/spark/sql/util/SparkSQLUtil.scala   | 113 +++++++++++++++++-
 .../apache/spark/util/CarbonReflectionUtils.scala  |   7 ++
 .../src/main/scala/org/apache/spark/Logging.scala  |  22 ++++
 .../main/scala/org/apache/spark/sql/SQLConf.scala  |  23 ++++
 .../apache/spark/sql/CarbonToSparkAdapater.scala   |   8 +-
 .../sql/catalyst/catalog/HiveTableRelation.scala   |  56 +++++++++
 .../sql/catalyst/optimizer/MigrateOptimizer.scala  | 129 +++++++++++++++++++++
 .../sql/catalyst/plans/logical/Subquery.scala      |  28 +++++
 .../apache/spark/sql/hive/CarbonSessionState.scala |  19 ++-
 30 files changed, 481 insertions(+), 81 deletions(-)

diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
index 7108bf8..5ffc46a 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -81,7 +81,7 @@ class MVDataMapProvider(
       val identifier = dataMapSchema.getRelationIdentifier
       val logicalPlan =
         new FindDataSourceTable(sparkSession).apply(
-          sparkSession.sessionState.catalog.lookupRelation(
+          SparkSQLUtil.sessionState(sparkSession).catalog.lookupRelation(
           TableIdentifier(identifier.getTableName,
             Some(identifier.getDatabaseName)))) match {
           case s: SubqueryAlias => s.child
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 8baa924..9bed098 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -547,7 +547,7 @@ object MVHelper {
                 relation,
                 aliasMap,
                 keepAlias = false)
-            SortOrder(expressions.head, s.direction, s.sameOrderExpressions)
+            SortOrder(expressions.head, s.direction)
           }
         // In case of limit it goes to other.
         case other => other
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
index 2c5d8f4..493539b 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
@@ -17,7 +17,7 @@
 
 package org.apache.carbondata.mv.rewrite
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.trees.TreeNode
 
 abstract class MatchPattern[MatchingPlan <: TreeNode[MatchingPlan]] extends Logging {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
index 026d6b7..ea0bfea 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.FindDataSourceTable
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.core.datamap.DataMapCatalog
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
@@ -34,6 +35,7 @@ import org.apache.carbondata.mv.plans.modular.{Flags, ModularPlan, ModularRelati
 import org.apache.carbondata.mv.plans.util.Signature
 import org.apache.carbondata.mv.session.MVSession
 
+
 /** Holds a summary logical plan */
 private[mv] case class SummaryDataset(signature: Option[Signature],
     plan: LogicalPlan,
@@ -114,7 +116,8 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
           mvSession.sessionState.optimizer.execute(planToRegister)).next().semiHarmonized
       val signature = modularPlan.signature
       val identifier = dataMapSchema.getRelationIdentifier
-      val output = new FindDataSourceTable(sparkSession).apply(sparkSession.sessionState.catalog
+      val output = new FindDataSourceTable(sparkSession)
+        .apply(SparkSQLUtil.sessionState(sparkSession).catalog
         .lookupRelation(TableIdentifier(identifier.getTableName, Some(identifier.getDatabaseName))))
         .output
       val relation = ModularRelation(identifier.getDatabaseName,
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala
index 25f07e4..95450b2 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala
@@ -20,15 +20,15 @@ package org.apache.carbondata.mv.rewrite
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.scalatest.BeforeAndAfter
-
 import org.apache.carbondata.mv.testutil.ModularPlanTest
+import org.apache.spark.sql.util.SparkSQLUtil
 
 class TestSQLSuite extends ModularPlanTest with BeforeAndAfter { 
   import org.apache.carbondata.mv.rewrite.matching.TestSQLBatch._
 
   val spark = sqlContext
   val testHive = sqlContext.sparkSession
-  val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient()
+  val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient()
   
   ignore("protypical mqo rewrite test") {
     
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala
index 882a43a..b30a131 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala
@@ -20,8 +20,8 @@ package org.apache.carbondata.mv.rewrite
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.scalatest.BeforeAndAfter
-
 import org.apache.carbondata.mv.testutil.ModularPlanTest
+import org.apache.spark.sql.util.SparkSQLUtil
 //import org.apache.spark.sql.catalyst.SQLBuilder
 import java.io.{File, PrintWriter}
 
@@ -31,7 +31,7 @@ class Tpcds_1_4_Suite extends ModularPlanTest with BeforeAndAfter {
 
   val spark = sqlContext
   val testHive = sqlContext.sparkSession
-  val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient()
+  val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient()
 
   test("test using tpc-ds queries") {
 
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
index cfe341a..e41c9ca 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
@@ -17,9 +17,9 @@
 
 package org.apache.carbondata.mv.expressions.modular
 
-import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, AttributeSet, Expression, ExprId, LeafExpression, NamedExpression, OuterReference, PlanExpression, Predicate, Unevaluable}
-import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.mv.plans.modular.ModularPlan
 
@@ -53,7 +53,8 @@ abstract class ModularSubquery(
   def canonicalize(attrs: AttributeSeq): ModularSubquery = {
     // Normalize the outer references in the subquery plan.
     val normalizedPlan = plan.transformAllExpressions {
-      case OuterReference(r) => OuterReference(QueryPlan.normalizeExprId(r, attrs))
+      case OuterReference(r) => OuterReference(SparkSQLUtil.
+        invokeQueryPlannormalizeExprId(r, attrs))
     }
     withNewPlan(normalizedPlan).canonicalized.asInstanceOf[ModularSubquery]
   }
@@ -80,7 +81,7 @@ case class ScalarModularSubquery(
 
   override lazy val canonicalized: Expression = {
     ScalarModularSubquery(
-      plan.canonicalized,
+      plan.canonicalizedDef,
       children.map(_.canonicalized),
       ExprId(0))
   }
@@ -122,7 +123,7 @@ case class ModularListQuery(
 
   override lazy val canonicalized: Expression = {
     ModularListQuery(
-      plan.canonicalized,
+      plan.canonicalizedDef,
       children.map(_.canonicalized),
       ExprId(0))
   }
@@ -153,7 +154,7 @@ case class ModularExists(
 
   override lazy val canonicalized: Expression = {
     ModularExists(
-      plan.canonicalized,
+      plan.canonicalizedDef,
       children.map(_.canonicalized),
       ExprId(0))
   }
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
index 77efaf7..a19066d 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
@@ -19,7 +19,8 @@ package org.apache.carbondata.mv.plans.modular
 
 import scala.collection._
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Cast, Divide, ExprId, Literal, NamedExpression}
+import org.apache.spark.sql.CarbonExpressions.MatchCast
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Divide, ExprId, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 
 trait AggregatePushDown { // self: ModularPlan =>
@@ -106,12 +107,11 @@ trait AggregatePushDown { // self: ModularPlan =>
         } else {
           Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
         }
-      case sum@AggregateExpression(Sum(Cast(expr, dataType, timeZoneId)), _, false, _)
-        if expr.isInstanceOf[Attribute] =>
+      case sum@AggregateExpression(Sum(cast@MatchCast(expr, dataType)), _, false, _) =>
         val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
           .asInstanceOf[Attribute]
         if (fact.outputSet.contains(tAttr)) {
-          val sum1 = AggregateExpression(Sum(Cast(tAttr, dataType, timeZoneId)), sum.mode, false)
+          val sum1 = AggregateExpression(Sum(cast), sum.mode, false)
           val alias = Alias(sum1, sum1.toString)()
           val tSum = AggregateExpression(Sum(alias.toAttribute), sum.mode, false, sum.resultId)
           val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
index ebe8c8c..cb2043e 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.mv.plans.modular
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.SQLConf
 
 import org.apache.carbondata.mv.plans
 import org.apache.carbondata.mv.plans._
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
index 6c82598..cdf0aa4 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
@@ -20,13 +20,13 @@ package org.apache.carbondata.mv.plans.modular
 import scala.collection._
 import scala.collection.mutable.{HashMap, MultiMap}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper}
 import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.catalyst.trees.TreeNode
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.mv.plans._
@@ -45,6 +45,10 @@ abstract class ModularPlan
 
   lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved
 
+  def canonicalizedDef: ModularPlan = {
+    canonicalized
+  }
+
   def childrenResolved: Boolean = children.forall(_.resolved)
 
   private var statsCache: Option[Statistics] = None
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
index 491d394..b7512d2 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.mv.plans.modular.Flags._
@@ -47,14 +47,7 @@ case class ModularRelation(databaseName: String,
   override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = {
     val plan = spark.table(s"${ databaseName }.${ tableName }").queryExecution.optimizedPlan
     val stats = SparkSQLUtil.invokeStatsMethod(plan, conf)
-    val output = outputList.map(_.toAttribute)
-    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
-      table => AttributeMap(table.output.zip(output))
-    }
-    val rewrites = mapSeq(0)
-    val attributeStats = AttributeMap(stats.attributeStats.iterator
-      .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
-    Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, stats.hints)
+    SparkSQLUtil.getStatisticsObj(outputList, plan, stats)
   }
 
   override def output: Seq[Attribute] = outputList.map(_.toAttribute)
@@ -155,10 +148,6 @@ case class HarmonizedRelation(source: ModularPlan) extends LeafNode {
     val stats = SparkSQLUtil.invokeStatsMethod(plan, conf)
     val output = source.asInstanceOf[GroupBy].child.children(0).asInstanceOf[ModularRelation]
       .outputList.map(_.toAttribute)
-    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
-      table => AttributeMap(table.output.zip(output))
-    }
-    val rewrites = mapSeq.head
     val aliasMap = AttributeMap(
       source.asInstanceOf[GroupBy].outputList.collect {
         case a@Alias(ar: Attribute, _) => (ar, a.toAttribute)
@@ -167,12 +156,7 @@ case class HarmonizedRelation(source: ModularPlan) extends LeafNode {
         case a@Alias(AggregateExpression(Last(ar: Attribute, _), _, _, _), _) =>
           (ar, a.toAttribute)
       })
-    val aStatsIterator = stats.attributeStats.iterator.map { pair => (rewrites(pair._1), pair._2) }
-    val attributeStats =
-      AttributeMap(
-        aStatsIterator.map(pair => (aliasMap.get(pair._1).getOrElse(pair._1), pair._2)).toSeq)
-
-    Statistics(stats.sizeInBytes, None, attributeStats, stats.hints)
+    SparkSQLUtil.getStatisticsObj(output, plan, stats, Option(aliasMap))
   }
 
   override def output: Seq[Attribute] = source.output
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
index d255359..a0d16cb 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
@@ -17,7 +17,7 @@
 
 package org.apache.carbondata.mv.plans.modular
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.expressions.{Exists, ListQuery, ScalarSubquery}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.trees.TreeNode
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
index 42cf15c..9182a89 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
@@ -23,13 +23,13 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
 import org.apache.spark.sql.catalyst.rules.{RuleExecutor, _}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.util.SparkSQLUtil
 
 object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
 
   val conf = new SQLConf()
-    .copy(SQLConf.CASE_SENSITIVE -> true, SQLConf.STARSCHEMA_DETECTION -> true)
+
   protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations)
 
   def batches: Seq[Batch] = {
@@ -40,7 +40,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
     Batch(
       "Finish Analysis", Once,
       EliminateSubqueryAliases,
-      EliminateView,
+      SparkSQLUtil.getEliminateViewObj(),
       ReplaceExpressions,
       ComputeCurrentTime,
       //      GetCurrentDatabase(sessionCatalog),
@@ -59,7 +59,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
       CombineUnions) ::
     Batch(
       "Pullup Correlated Expressions", Once,
-      PullupCorrelatedPredicates) ::
+      SparkSQLUtil.getPullupCorrelatedPredicatesObj()) ::
     Batch(
       "Subquery", Once,
       OptimizeSubqueries) ::
@@ -107,7 +107,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
         SimplifyCaseConversionExpressions,
         RewriteCorrelatedScalarSubquery,
         EliminateSerialization,
-        RemoveRedundantAliases,
+        SparkSQLUtil.getRemoveRedundantAliasesObj(),
         RemoveRedundantProject,
         SimplifyCreateStructOps,
         SimplifyCreateArrayOps,
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
index de65e37..0652575 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
@@ -17,9 +17,8 @@
 
 package org.apache.carbondata.mv.plans.util
 
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference,
-  AttributeSet, Expression, NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression, PredicateHelper}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -27,6 +26,8 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.carbondata.mv.plans.modular.Flags._
 import org.apache.carbondata.mv.plans.modular.JoinEdge
 
+
+
 /**
  * SelectModule is extracted from logical plan of SPJG query.  All join conditions
  * filter, and project operators in a single Aggregate-less subtree of logical plan
@@ -335,13 +336,13 @@ object ExtractTableModule extends PredicateHelper {
   def unapply(plan: LogicalPlan): Option[ReturnType] = {
     plan match {
       // uncomment for cloudera1 version
-//      case m: CatalogRelation =>
-//        Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
-//          Seq.empty)
-//       uncomment for apache version
+      //      case m: CatalogRelation =>
+      //        Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
+      //          Seq.empty)
+      //       uncomment for apache version
       case m: HiveTableRelation =>
-            Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
-              Seq.empty)
+        Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
+          Seq.empty)
       case l: LogicalRelation =>
         val tableIdentifier = l.catalogTable.map(_.identifier)
         val database = tableIdentifier.map(_.database).flatten.getOrElse(null)
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
index 307fff0..d2e4375 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.mv.plans.util
 
+import org.apache.spark.sql.CarbonExpressions.MatchCastExpression
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, BitwiseAnd, Cast, Expression, Grouping, GroupingID, Literal, NamedExpression, ShiftRight}
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.types.{ByteType, IntegerType}
@@ -398,10 +399,10 @@ trait SQLBuildDSL {
               // it back.
               case ar: AttributeReference if ar == gid => GroupingID(Nil)
               case ar: AttributeReference if groupByAttrMap.contains(ar) => groupByAttrMap(ar)
-              case a@Cast(
+              case a@MatchCastExpression(
               BitwiseAnd(
               ShiftRight(ar: AttributeReference, Literal(value: Any, IntegerType)),
-              Literal(1, IntegerType)), ByteType, None) if ar == gid =>
+              Literal(1, IntegerType)), ByteType) if ar == gid =>
                 // for converting an expression to its original SQL format grouping(col)
                 val idx = groupByExprs.length - 1 - value.asInstanceOf[Int]
                 groupByExprs.lift(idx).map(Grouping).getOrElse(a)
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
index b6e62eb..815fb58 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
@@ -220,15 +220,6 @@ class SQLBuilder private(
       }
     }
   }
-
-  object RemoveCasts extends Rule[ModularPlan] {
-    def apply(tree: ModularPlan): ModularPlan = {
-      tree transformAllExpressions {
-        case Cast(e, dataType, _) => e
-      }
-    }
-  }
-
 }
 
 object SQLBuilder {
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala
index c46124b..1f8d7b4 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala
@@ -17,7 +17,7 @@
 
 package org.apache.carbondata.mv.plans.util
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.trees.TreeNode
 
 case class Signature(groupby: Boolean = true, datasets: Set[String] = Set.empty)
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala
index 01ca448..103333d 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala
@@ -842,14 +842,14 @@ object Tpcds_1_4_Tables {
        |STORED AS parquet
           """.stripMargin.trim,
     s"""
-       | CREATE TABLE fact_table1 (empname String, designation String, doj Timestamp,
+       | CREATE TABLE IF NOT EXISTS fact_table1 (empname String, designation String, doj Timestamp,
        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
        |  utilization int,salary int)
        |STORED AS parquet
           """.stripMargin.trim,
     s"""
-       | CREATE TABLE fact_table2 (empname String, designation String, doj Timestamp,
+       | CREATE TABLE IF NOT EXISTS fact_table2 (empname String, designation String, doj Timestamp,
        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
        |  utilization int,salary int)
diff --git a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ModularToSQLSuite.scala b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ModularToSQLSuite.scala
index c74491c..dad8f8a 100644
--- a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ModularToSQLSuite.scala
+++ b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ModularToSQLSuite.scala
@@ -19,9 +19,9 @@ package org.apache.carbondata.mv.plans
 
 import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.scalatest.BeforeAndAfter
-
 import org.apache.carbondata.mv.dsl.Plans._
 import org.apache.carbondata.mv.testutil.ModularPlanTest
+import org.apache.spark.sql.util.SparkSQLUtil
 
 class ModularToSQLSuite extends ModularPlanTest with BeforeAndAfter {
 
@@ -29,7 +29,7 @@ class ModularToSQLSuite extends ModularPlanTest with BeforeAndAfter {
 
   val spark = sqlContext
   val testHive = sqlContext.sparkSession
-  val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient()
+  val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient()
   
   ignore("convert modular plans to sqls") {
     
diff --git a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/SignatureSuite.scala b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/SignatureSuite.scala
index 631eca2..5d4a05f 100644
--- a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/SignatureSuite.scala
+++ b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/SignatureSuite.scala
@@ -20,17 +20,17 @@ package org.apache.carbondata.mv.plans
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.scalatest.BeforeAndAfterAll
-
 import org.apache.carbondata.mv.dsl.Plans._
 import org.apache.carbondata.mv.plans.modular.ModularPlanSignatureGenerator
 import org.apache.carbondata.mv.testutil.ModularPlanTest
+import org.apache.spark.sql.util.SparkSQLUtil
 
 class SignatureSuite extends ModularPlanTest with BeforeAndAfterAll {
   import org.apache.carbondata.mv.testutil.TestSQLBatch._
 
   val spark = sqlContext
   val testHive = sqlContext.sparkSession
-  val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient()
+  val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient()
   
   ignore("test signature computing") {
 
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/EmptyRule.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/EmptyRule.scala
new file mode 100644
index 0000000..4c77b29
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/EmptyRule.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+object EmptyRule extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan
+  }
+}
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 9ffe6e1..7903ac7 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -23,10 +23,14 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkContext
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.analysis.EmptyRule
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, Cast, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
-import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil}
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil, Utils}
 
 object SparkSQLUtil {
   def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
@@ -51,6 +55,87 @@ object SparkSQLUtil {
     }
   }
 
+  def invokeQueryPlannormalizeExprId(r: NamedExpression, input: AttributeSeq)
+      : NamedExpression = {
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.QueryPlan")
+      clazz.getDeclaredMethod("normalizeExprId", classOf[Any], classOf[AttributeSeq]).
+        invoke(null, r, input).asInstanceOf[NamedExpression]
+    } else {
+      r
+    }
+  }
+
+  def getStatisticsObj(outputList: Seq[NamedExpression],
+                       plan: LogicalPlan, stats: Statistics,
+                       aliasMap: Option[AttributeMap[Attribute]] = None)
+  : Statistics = {
+    val className = "org.apache.spark.sql.catalyst.plans.logical.Statistics"
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val output = outputList.map(_.toAttribute)
+      val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+        table => AttributeMap(table.output.zip(output))
+      }
+      val rewrites = mapSeq.head
+      val attributes : AttributeMap[ColumnStat] = CarbonReflectionUtils.
+        getField("attributeStats", stats).asInstanceOf[AttributeMap[ColumnStat]]
+      var attributeStats = AttributeMap(attributes.iterator
+        .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+      if (aliasMap.isDefined) {
+        attributeStats = AttributeMap(
+          attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
+      }
+      val hints = CarbonReflectionUtils.getField("hints", stats).asInstanceOf[Object]
+      CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
+        stats.rowCount, attributeStats, hints).asInstanceOf[Statistics]
+    } else {
+      val output = outputList.map(_.name)
+      val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+        table => table.output.map(_.name).zip(output).toMap
+      }
+      val rewrites = mapSeq.head
+      val colStats = CarbonReflectionUtils.getField("colStats", stats)
+        .asInstanceOf[Map[String, ColumnStat]]
+      var attributeStats = colStats.iterator
+        .map { pair => (rewrites(pair._1), pair._2) }.toMap
+      if (aliasMap.isDefined) {
+        val aliasMapName = aliasMap.get.map(x => (x._1.name, x._2.name))
+        attributeStats =
+          attributeStats.map(pair => (aliasMapName.getOrElse(pair._1, pair._1)
+            , pair._2))
+      }
+      CarbonReflectionUtils.createObject(className, stats.sizeInBytes,
+        stats.rowCount, attributeStats).asInstanceOf[Statistics]
+    }
+  }
+
+  def getEliminateViewObj(): Rule[LogicalPlan] = {
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val className = "org.apache.spark.sql.catalyst.analysis.EliminateView"
+      CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
+    } else {
+      EmptyRule
+    }
+  }
+
+  def getPullupCorrelatedPredicatesObj(): Rule[LogicalPlan] = {
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.PullupCorrelatedPredicates"
+      CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
+    } else {
+      EmptyRule
+    }
+  }
+
+  def getRemoveRedundantAliasesObj(): Rule[LogicalPlan] = {
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAliases"
+      CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]]
+    } else {
+      EmptyRule
+    }
+  }
+
   def getReorderJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
     if (SparkUtil.isSparkVersionEqualTo("2.2")) {
       val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin";
@@ -59,7 +144,12 @@ object SparkSQLUtil {
       val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
       CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
         .asInstanceOf[Rule[LogicalPlan]]
-    } else {
+    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
+    }
+    else {
       throw new UnsupportedOperationException("Spark version not supported")
     }
   }
@@ -72,7 +162,12 @@ object SparkSQLUtil {
       val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
       CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
         .asInstanceOf[Rule[LogicalPlan]]
-    } else {
+    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
+    }
+    else {
       throw new UnsupportedOperationException("Spark version not supported")
     }
   }
@@ -85,6 +180,10 @@ object SparkSQLUtil {
       val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
       CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
         .asInstanceOf[Rule[LogicalPlan]]
+    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
     } else {
       throw new UnsupportedOperationException("Spark version not supported")
     }
@@ -98,7 +197,11 @@ object SparkSQLUtil {
       val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$";
       CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
         .asInstanceOf[Rule[LogicalPlan]]
-    } else {
+    } else if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
+      CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]]
+    }
+    else {
       throw new UnsupportedOperationException("Spark version not supported")
     }
   }
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index ee635e0..bdacfcd 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -320,6 +320,13 @@ object CarbonReflectionUtils {
       ._1.asInstanceOf[RunnableCommand]
   }
 
+  def createSingleObject(className: String): Any = {
+    val classMirror = universe.runtimeMirror(getClass.getClassLoader)
+    val classTest = classMirror.staticModule(className)
+    val methods = classMirror.reflectModule(classTest)
+    methods.instance
+  }
+
   def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
     val clazz = Utils.classForName(className)
     val ctor = clazz.getConstructors.head
diff --git a/integration/spark2/src/main/scala/org/apache/spark/Logging.scala b/integration/spark2/src/main/scala/org/apache/spark/Logging.scala
new file mode 100644
index 0000000..62d6862
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/Logging.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+trait Logging extends org.apache.spark.internal.Logging{
+
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SQLConf.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SQLConf.scala
new file mode 100644
index 0000000..6f60d0f
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+class SQLConf extends org.apache.spark.sql.internal.SQLConf {
+  val CASE_SENSITIVE = true
+
+  val STARSCHEMA_DETECTION = true
+}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
index 69541eb..79a6240 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -25,7 +25,9 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
-import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
+import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.types.{DataType, Metadata}
 
@@ -81,4 +83,8 @@ object CarbonToSparkAdapter {
       tablePath: String): CatalogStorageFormat = {
     storageFormat.copy(properties = map, locationUri = Some(tablePath))
   }
+
+  def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
+    Seq(OptimizeCodegen(conf))
+  }
 }
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
new file mode 100644
index 0000000..eb3e88d
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import com.google.common.base.Objects
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+  * A `LogicalPlan` that represents a hive table.
+  *
+  * TODO: remove this after we completely make hive as a data source.
+  */
+case class HiveTableRelation(
+                              tableMeta: CatalogTable,
+                              dataCols: Seq[AttributeReference],
+                              partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
+  assert(tableMeta.identifier.database.isDefined)
+  assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
+  assert(tableMeta.schema.sameType(dataCols.toStructType))
+
+  // The partition column should always appear after data columns.
+  override def output: Seq[AttributeReference] = dataCols ++ partitionCols
+
+  def isPartitioned: Boolean = partitionCols.nonEmpty
+
+  override def equals(relation: Any): Boolean = relation match {
+    case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Objects.hashCode(tableMeta.identifier, output)
+  }
+
+  override def newInstance(): HiveTableRelation = copy(
+    dataCols = dataCols.map(_.newInstance()),
+    partitionCols = partitionCols.map(_.newInstance()))
+}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
new file mode 100644
index 0000000..9a88255
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseKeyWhen, CreateArray, CreateMap, CreateNamedStructLike, GetArrayItem, GetArrayStructFields, GetMapValue, GetStructField, IntegerLiteral, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.First
+import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable, MapObjects}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project, UnaryNode}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+class MigrateOptimizer {
+
+}
+
+/**
+  * Replaces logical [[Deduplicate]] operator with an [[Aggregate]] operator.
+  */
+object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case Deduplicate(keys, child, streaming) if !streaming =>
+      val keyExprIds = keys.map(_.exprId)
+      val aggCols = child.output.map { attr =>
+        if (keyExprIds.contains(attr.exprId)) {
+          attr
+        } else {
+          Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
+        }
+      }
+      Aggregate(keys, aggCols, child)
+  }
+}
+
+/** A logical plan for `dropDuplicates`. */
+case class Deduplicate(
+                        keys: Seq[Attribute],
+                        child: LogicalPlan,
+                        streaming: Boolean) extends UnaryNode {
+
+  override def output: Seq[Attribute] = child.output
+}
+
+/**
+  * Remove projections from the query plan that do not make any modifications.
+  */
+object RemoveRedundantProject extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case p @ Project(_, child) if p.output == child.output => child
+  }
+}
+
+/**
+  * push down operations into [[CreateNamedStructLike]].
+  */
+object SimplifyCreateStructOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformExpressionsUp {
+      // push down field extraction
+      case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) =>
+        createNamedStructLike.valExprs(ordinal)
+    }
+  }
+}
+
+/**
+  * push down operations into [[CreateArray]].
+  */
+object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformExpressionsUp {
+      // push down field selection (array of structs)
+      case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) =>
+        // instead f selecting the field on the entire array,
+        // select it from each member of the array.
+        // pushing down the operation this way open other optimizations opportunities
+        // (i.e. struct(...,x,...).x)
+        CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name))))
+      // push down item selection.
+      case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
+        // instead of creating the array and then selecting one row,
+        // remove array creation altgether.
+        if (idx >= 0 && idx < elems.size) {
+          // valid index
+          elems(idx)
+        } else {
+          // out of bounds, mimic the runtime behavior and return null
+          Literal(null, ga.dataType)
+        }
+    }
+  }
+}
+
+/**
+  * push down operations into [[CreateMap]].
+  */
+object SimplifyCreateMapOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformExpressionsUp {
+      case GetMapValue(CreateMap(elems), key) => CaseKeyWhen(key, elems)
+    }
+  }
+}
+
+
+/**
+  * Removes MapObjects when the following conditions are satisfied
+  *   1. Mapobject(... lambdavariable(..., false) ...), which means types for input and output
+  *      are primitive types with non-nullable
+  *   2. no custom collection class specified representation of data item.
+  */
+object EliminateMapObjects extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+    case MapObjects(_, _, _, LambdaVariable(_, _, _), inputData) => inputData
+  }
+}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
new file mode 100644
index 0000000..4abf189
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+/**
+  * This node is inserted at the top of a subquery when it is optimized. This makes sure we can
+  * recognize a subquery as such, and it allows us to write subquery aware transformations.
+  */
+case class Subquery(child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+}
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 47feae0..5caa4dd 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -36,13 +36,13 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser, CarbonSparkSqlParserUtil}
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy}
-
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.util.CarbonScalaUtil
+import org.apache.spark.util.CarbonReflectionUtils
 
 /**
  * This class will have carbon catalog and refresh the relation from cache if the carbontable in
@@ -301,10 +301,25 @@ class CarbonAnalyzer(catalog: SessionCatalog,
     conf: CatalystConf,
     sparkSession: SparkSession,
     analyzer: Analyzer) extends Analyzer(catalog, conf) {
+
+  val mvPlan = try {
+    CarbonReflectionUtils.createObject(
+      "org.apache.carbondata.mv.datamap.MVAnalyzerRule",
+      sparkSession)._1.asInstanceOf[Rule[LogicalPlan]]
+  } catch {
+    case e: Exception =>
+      null
+  }
+
   override def execute(plan: LogicalPlan): LogicalPlan = {
     var logicalPlan = analyzer.execute(plan)
     logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
     CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
+    if (mvPlan != null) {
+      mvPlan.apply(logicalPlan)
+    } else {
+      logicalPlan
+    }
   }
 }