You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/27 22:48:48 UTC

[3/4] carbondata git commit: [CARBONDATA-1552][Spark-2.2 Integration] Spark-2.2 Carbon Integration

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 7c23e5e..1bb6ab0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -19,16 +19,19 @@ package org.apache.spark.sql.hive
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, InsertIntoCarbonTable, SparkSession}
+import org.apache.spark.sql._
+import org.apache.spark.sql.CarbonExpressions.CarbonSubqueryAlias
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, NamedExpression, PredicateSubquery, ScalaUDF}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, NamedExpression, ScalaUDF}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.sql.CarbonExpressions.MatchCast
+import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
@@ -100,7 +103,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         // subquery
         case Aggregate(groupingExp,
         aggregateExp,
-        SubqueryAlias(_, logicalRelation: LogicalRelation, _))
+        CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
           // only carbon query plan is supported checking whether logical relation is
           // is for carbon
           if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
@@ -120,7 +123,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         // filter expression
         case Aggregate(groupingExp, aggregateExp,
         Filter(filterExp,
-        SubqueryAlias(_, logicalRelation: LogicalRelation, _)))
+        CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
           // only carbon query plan is supported checking whether logical relation is
           // is for carbon
           if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
@@ -134,7 +137,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             tableName,
             list)
           // TODO need to handle filter predicate subquery scenario
-          isValidPlan = !PredicateSubquery.hasPredicateSubquery(filterExp)
+          // isValidPlan = !PredicateSubquery.hasPredicateSubquery(filterExp)
           // getting the columns from filter expression
           if(isValidPlan) {
             filterExp.transform {
@@ -187,19 +190,19 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         // if it doesnot match with any pre aggregate table return the same plan
         if (!selectedDataMapSchemas.isEmpty) {
           // sort the selected child schema based on size to select smallest pre aggregate table
-          val (aggDataMapSchema, carbonRelation) =
+          val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          val (aggDataMapSchema, carbonRelation, relation) =
             selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
-              val catalog = sparkSession.sessionState.catalog
-              val carbonRelation = catalog
-                .lookupRelation(TableIdentifier(selectedDataMapSchema.getRelationIdentifier
-                  .getTableName,
-                  Some(selectedDataMapSchema.getRelationIdentifier
-                    .getDatabaseName))).asInstanceOf[SubqueryAlias].child
-                .asInstanceOf[LogicalRelation]
-              (selectedDataMapSchema, carbonRelation)
-            }.minBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes)
+              val identifier = TableIdentifier(
+                selectedDataMapSchema.getRelationIdentifier.getTableName,
+                Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
+              val carbonRelation =
+                catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
+              val relation = sparkSession.sessionState.catalog.lookupRelation(identifier)
+              (selectedDataMapSchema, carbonRelation, relation)
+            }.minBy(f => f._2.sizeInBytes)
           // transform the query plan based on selected child schema
-          transformPreAggQueryPlan(plan, aggDataMapSchema, carbonRelation)
+          transformPreAggQueryPlan(plan, aggDataMapSchema, relation)
         } else {
           plan
         }
@@ -217,7 +220,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
    * child schema
    * @param attributeReference
    * parent attribute reference
-   * @param childCarbonRelation
+   * @param attributes
    * child logical relation
    * @param aggFunction
    * aggregation function applied on child
@@ -225,7 +228,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
    */
   def getChildAttributeReference(dataMapSchema: DataMapSchema,
       attributeReference: AttributeReference,
-      childCarbonRelation: LogicalRelation,
+      attributes: Seq[AttributeReference],
       aggFunction: String = ""): AttributeReference = {
     val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema];
     val columnSchema = if (aggFunction.isEmpty) {
@@ -240,7 +243,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       throw new AnalysisException("Column does not exists in Pre Aggregate table")
     }
     // finding the child attribute from child logical relation
-    childCarbonRelation.attributeMap.find(p => p._2.name.equals(columnSchema.getColumnName)).get._2
+    attributes.find(p => p.name.equals(columnSchema.getColumnName)).get
   }
 
   /**
@@ -268,14 +271,16 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
    * parent logical plan
    * @param aggDataMapSchema
    * select data map schema
-   * @param childCarbonRelation
+   * @param childPlan
    * child carbon table relation
    * @return transformed plan
    */
   def transformPreAggQueryPlan(logicalPlan: LogicalPlan,
-      aggDataMapSchema: DataMapSchema, childCarbonRelation: LogicalRelation): LogicalPlan = {
+      aggDataMapSchema: DataMapSchema,
+      childPlan: LogicalPlan): LogicalPlan = {
+    val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
     logicalPlan.transform {
-      case Aggregate(grExp, aggExp, child@SubqueryAlias(_, l: LogicalRelation, _))
+      case Aggregate(grExp, aggExp, child@CarbonSubqueryAlias(_, l: LogicalRelation))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
         val (updatedGroupExp, updatedAggExp, newChild, None) =
@@ -284,13 +289,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             child,
             None,
             aggDataMapSchema,
-            childCarbonRelation)
+            attributes,
+            childPlan)
         Aggregate(updatedGroupExp,
           updatedAggExp,
           newChild)
       case Aggregate(grExp,
       aggExp,
-      Filter(expression, child@SubqueryAlias(_, l: LogicalRelation, _)))
+      Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation)))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
         val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
@@ -299,7 +305,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             child,
             Some(expression),
             aggDataMapSchema,
-            childCarbonRelation)
+            attributes,
+            childPlan)
         Aggregate(updatedGroupExp,
           updatedAggExp,
           Filter(updatedFilterExpression.get,
@@ -313,7 +320,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             l,
             None,
             aggDataMapSchema,
-            childCarbonRelation)
+            attributes,
+            childPlan)
         Aggregate(updatedGroupExp,
           updatedAggExp,
           newChild)
@@ -339,7 +347,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
    * filter expression
    * @param aggDataMapSchema
    * pre aggregate table schema
-   * @param childCarbonRelation
+   * @param attributes
    * pre aggregate table logical relation
    * @return tuple of(updated grouping expression,
    *         updated aggregate expression,
@@ -350,13 +358,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       aggregateExpressions: Seq[NamedExpression],
       child: LogicalPlan, filterExpression: Option[Expression] = None,
       aggDataMapSchema: DataMapSchema,
-      childCarbonRelation: LogicalRelation): (Seq[Expression], Seq[NamedExpression], LogicalPlan,
+      attributes: Seq[AttributeReference],
+      aggPlan: LogicalPlan): (Seq[Expression], Seq[NamedExpression], LogicalPlan,
     Option[Expression]) = {
     // transforming the group by expression attributes with child attributes
     val updatedGroupExp = groupingExpressions.map { exp =>
       exp.transform {
         case attr: AttributeReference =>
-          getChildAttributeReference(aggDataMapSchema, attr, childCarbonRelation)
+          getChildAttributeReference(aggDataMapSchema, attr, attributes)
       }
     }
     // below code is for updating the aggregate expression.
@@ -379,7 +388,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       case attr: AttributeReference =>
         val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
           attr,
-          childCarbonRelation)
+          attributes)
         // returning the alias to show proper column name in output
         Alias(childAttributeReference,
           attr.name)(NamedExpression.newExprId,
@@ -388,7 +397,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       case Alias(attr: AttributeReference, name) =>
         val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
           attr,
-          childCarbonRelation)
+          attributes)
         // returning alias with child attribute reference
         Alias(childAttributeReference,
           name)(NamedExpression.newExprId,
@@ -398,7 +407,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         // get the updated aggregate aggregate function
         val aggExp = getUpdatedAggregateExpressionForChild(attr,
           aggDataMapSchema,
-          childCarbonRelation)
+          attributes)
         // returning alias with child attribute reference
         Alias(aggExp,
           name)(NamedExpression.newExprId,
@@ -407,16 +416,19 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
     // transformaing the logical relation
     val newChild = child.transform {
       case _: LogicalRelation =>
-        childCarbonRelation
+        aggPlan
       case _: SubqueryAlias =>
-        childCarbonRelation
+        aggPlan match {
+          case s: SubqueryAlias => s.child
+          case others => others
+        }
     }
     // updating the filter expression if present
     val updatedFilterExpression = if (filterExpression.isDefined) {
       val filterExp = filterExpression.get
       Some(filterExp.transform {
         case attr: AttributeReference =>
-          getChildAttributeReference(aggDataMapSchema, attr, childCarbonRelation)
+          getChildAttributeReference(aggDataMapSchema, attr, attributes)
       })
     } else {
       None
@@ -441,13 +453,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
    * aggregate expression
    * @param dataMapSchema
    * child data map schema
-   * @param childCarbonRelation
+   * @param attributes
    * child logical relation
    * @return updated expression
    */
   def getUpdatedAggregateExpressionForChild(aggExp: AggregateExpression,
       dataMapSchema: DataMapSchema,
-      childCarbonRelation: LogicalRelation):
+      attributes: Seq[AttributeReference]):
   Expression = {
     aggExp.aggregateFunction match {
       // Change the count AggregateExpression to Sum as count
@@ -456,7 +468,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       case count@Count(Seq(attr: AttributeReference)) =>
         AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
           attr,
-          childCarbonRelation,
+          attributes,
           count.prettyName),
           LongType)),
           aggExp.mode,
@@ -464,44 +476,44 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       case sum@Sum(attr: AttributeReference) =>
         AggregateExpression(Sum(getChildAttributeReference(dataMapSchema,
           attr,
-          childCarbonRelation,
+          attributes,
           sum.prettyName)),
           aggExp.mode,
           isDistinct = false)
       case max@Max(attr: AttributeReference) =>
         AggregateExpression(Max(getChildAttributeReference(dataMapSchema,
           attr,
-          childCarbonRelation,
+          attributes,
           max.prettyName)),
           aggExp.mode,
           isDistinct = false)
       case min@Min(attr: AttributeReference) =>
         AggregateExpression(Min(getChildAttributeReference(dataMapSchema,
           attr,
-          childCarbonRelation,
+          attributes,
           min.prettyName)),
           aggExp.mode,
           isDistinct = false)
-      case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+      case sum@Sum(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
         AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
           attr,
-          childCarbonRelation,
+          attributes,
           sum.prettyName),
           changeDataType)),
           aggExp.mode,
           isDistinct = false)
-      case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+      case min@Min(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
         AggregateExpression(Min(Cast(getChildAttributeReference(dataMapSchema,
           attr,
-          childCarbonRelation,
+          attributes,
           min.prettyName),
           changeDataType)),
           aggExp.mode,
           isDistinct = false)
-      case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+      case max@Max(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
         AggregateExpression(Max(Cast(getChildAttributeReference(dataMapSchema,
           attr,
-          childCarbonRelation,
+          attributes,
           max.prettyName),
           changeDataType)),
           aggExp.mode,
@@ -513,13 +525,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       case Average(attr: AttributeReference) =>
         Divide(AggregateExpression(Sum(getChildAttributeReference(dataMapSchema,
           attr,
-          childCarbonRelation,
+          attributes,
           "sum")),
           aggExp.mode,
           isDistinct = false),
           AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
             attr,
-            childCarbonRelation,
+            attributes,
             "count"),
             LongType)),
             aggExp.mode,
@@ -527,17 +539,17 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       // In case of average aggregate function select 2 columns from aggregate table
       // with aggregation sum and count.
       // Then add divide(sum(column with sum), sum(column with count)).
-      case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+      case Average(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
         Divide(AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
           attr,
-          childCarbonRelation,
+          attributes,
           "sum"),
           changeDataType)),
           aggExp.mode,
           isDistinct = false),
           AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
             attr,
-            childCarbonRelation,
+            attributes,
             "count"),
             LongType)),
             aggExp.mode,
@@ -632,7 +644,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           carbonTable,
           tableName,
           sum.prettyName))
-      case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+      case sum@Sum(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
         Seq(getQueryColumn(attr.name,
           carbonTable,
           tableName,
@@ -649,7 +661,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           carbonTable,
           tableName,
           min.prettyName))
-      case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+      case min@Min(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
         Seq(getQueryColumn(attr.name,
           carbonTable,
           tableName,
@@ -661,7 +673,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           carbonTable,
           tableName,
           max.prettyName))
-      case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+      case max@Max(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
         Seq(getQueryColumn(attr.name,
           carbonTable,
           tableName,
@@ -682,7 +694,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         ))
       // in case of average need to return two columns
       // sum and count of the column to added during table creation to support rollup
-      case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+      case Average(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
         Seq(getQueryColumn(attr.name,
           carbonTable,
           tableName,
@@ -742,7 +754,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
 /**
  * Insert into carbon table from other source
  */
-object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
+case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transform {
       // Wait until children are resolved.
@@ -781,12 +793,22 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
           case attr => attr
         }
       }
+      val version = sparkSession.version
       val newChild: LogicalPlan = if (newChildOutput == childPlan.output) {
-        p.child
+        if (version.startsWith("2.1")) {
+          CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
+        } else if (version.startsWith("2.2")) {
+          CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
+        } else {
+          throw new UnsupportedOperationException(s"Spark version $version is not supported")
+        }
       } else {
         Project(newChildOutput, childPlan)
       }
-      InsertIntoCarbonTable(relation, p.partition, newChild, p.overwrite, p.ifNotExists)
+
+      val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)
+
+      InsertIntoCarbonTable(relation, p.partition, newChild, overwrite, true)
     } else {
       CarbonException.analysisException(
         "Cannot insert into target table because number of columns mismatch")
@@ -812,7 +834,7 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
                   Alias(attrExpression
                     .copy(aggregateFunction = Count(attr),
                       resultId = NamedExpression.newExprId), attr.name + "_count")())
-              case Average(cast@Cast(attr: AttributeReference, _)) =>
+              case Average(cast@MatchCast(attr: AttributeReference, _)) =>
                 Seq(Alias(attrExpression
                   .copy(aggregateFunction = Sum(cast),
                     resultId = NamedExpression.newExprId),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 69b8d50..aadce98 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -171,7 +171,13 @@ case class CarbonRelation(
   }
 
   // TODO: Use data from the footers.
-  override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
+  // TODO For 2.1
+  //  override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
+  // Todo for 2.2
+  //  override def computeStats(conf: SQLConf): Statistics = Statistics(sizeInBytes =
+  //  this.sizeInBytes)
+
+  // override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
 
   override def equals(other: Any): Boolean = {
     other match {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
deleted file mode 100644
index b0aecd7..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.SparkOptimizer
-import org.apache.spark.sql.execution.command.datamap.{DataMapDropTablePostListener, DropDataMapPostListener}
-import org.apache.spark.sql.execution.command.preaaggregate._
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
-import org.apache.spark.sql.parser.CarbonSparkSqlParser
-
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events._
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonSessionCatalog(
-    externalCatalog: HiveExternalCatalog,
-    globalTempViewManager: GlobalTempViewManager,
-    sparkSession: SparkSession,
-    functionResourceLoader: FunctionResourceLoader,
-    functionRegistry: FunctionRegistry,
-    conf: SQLConf,
-    hadoopConf: Configuration)
-  extends HiveSessionCatalog(
-    externalCatalog,
-    globalTempViewManager,
-    sparkSession,
-    functionResourceLoader,
-    functionRegistry,
-    conf,
-    hadoopConf) {
-
-  lazy val carbonEnv = {
-    val env = new CarbonEnv
-    env.init(sparkSession)
-    env
-  }
-
-  /**
-   * This method will invalidate carbonrelation from cache if carbon table is updated in
-   * carbon catalog
-   *
-   * @param name
-   * @param alias
-   * @return
-   */
-  override def lookupRelation(name: TableIdentifier,
-      alias: Option[String]): LogicalPlan = {
-    val rtnRelation = super.lookupRelation(name, alias)
-    var toRefreshRelation = false
-    rtnRelation match {
-      case SubqueryAlias(_,
-          LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
-          _) =>
-        toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
-      case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
-        toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
-      case _ =>
-    }
-
-    if (toRefreshRelation) {
-      super.lookupRelation(name, alias)
-    } else {
-      rtnRelation
-    }
-  }
-
-  private def refreshRelationFromCache(identifier: TableIdentifier,
-      alias: Option[String],
-      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
-    var isRefreshed = false
-    val storePath = CarbonProperties.getStorePath
-    carbonEnv.carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables()
-
-    val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
-      carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
-      carbonDatasourceHadoopRelation.carbonTable.getTableName)
-    if (table.isEmpty || (table.isDefined &&
-        table.get.getTableLastUpdatedTime !=
-          carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
-      refreshTable(identifier)
-      DataMapStoreManager.getInstance().
-        clearDataMaps(AbsoluteTableIdentifier.from(storePath,
-          identifier.database.getOrElse("default"), identifier.table))
-      isRefreshed = true
-      logInfo(s"Schema changes have been detected for table: $identifier")
-    }
-    isRefreshed
-  }
-}
-
-/**
- * Session state implementation to override sql parser and adding strategies
- * @param sparkSession
- */
-class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
-
-  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
-  experimentalMethods.extraStrategies =
-    Seq(
-      new StreamingTableStrategy(sparkSession),
-      new CarbonLateDecodeStrategy,
-      new DDLStrategy(sparkSession)
-    )
-  experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
-
-  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
-  override lazy val analyzer: Analyzer = {
-    new Analyzer(catalog, conf) {
-      override val extendedResolutionRules =
-        catalog.ParquetConversions ::
-        catalog.OrcConversions ::
-        CarbonPreInsertionCasts ::
-        CarbonPreAggregateQueryRules(sparkSession) ::
-        CarbonIUDAnalysisRule(sparkSession) ::
-        AnalyzeCreateTable(sparkSession) ::
-        PreprocessTableInsertion(conf) ::
-        DataSourceAnalysis(conf) ::
-        (if (conf.runSQLonFile) {
-          new ResolveDataSource(sparkSession) :: Nil
-        } else {  Nil }
-           )
-
-      override val extendedCheckRules = Seq(
-        PreWriteCheck(conf, catalog))
-    }
-  }
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  override lazy val catalog = {
-    new CarbonSessionCatalog(
-      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
-      sparkSession.sharedState.globalTempViewManager,
-      sparkSession,
-      functionResourceLoader,
-      functionRegistry,
-      conf,
-      newHadoopConf())
-  }
-}
-
-class CarbonOptimizer(
-    catalog: SessionCatalog,
-    conf: SQLConf,
-    experimentalMethods: ExperimentalMethods)
-  extends SparkOptimizer(catalog, conf, experimentalMethods) {
-
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
-    // optimize whole plan at once.
-    val transFormedPlan = plan.transform {
-      case filter: Filter =>
-        filter.transformExpressions {
-          case s: ScalarSubquery =>
-            val tPlan = s.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            ScalarSubquery(tPlan, s.children, s.exprId)
-          case p: PredicateSubquery =>
-            val tPlan = p.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
-        }
-    }
-    super.execute(transFormedPlan)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
deleted file mode 100644
index 6c91e7e..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.internal
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * To initialize dynamic values default param
- */
-class CarbonSQLConf(sparkSession: SparkSession) {
-
-  val carbonProperties = CarbonProperties.getInstance()
-
-  /**
-   * To initialize dynamic param defaults along with usage docs
-   */
-  def addDefaultCarbonParams(): Unit = {
-    val ENABLE_UNSAFE_SORT =
-      SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
-        .doc("To enable/ disable unsafe sort.")
-        .booleanConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
-      SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
-        .doc("To enable/ disable carbon custom block distribution.")
-        .booleanConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-            CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
-    val BAD_RECORDS_LOGGER_ENABLE =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
-        .doc("To enable/ disable carbon bad record logger.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants
-          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    val BAD_RECORDS_ACTION =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
-        .doc("To configure the bad records action.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    val IS_EMPTY_DATA_BAD_RECORD =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
-        .doc("Property to decide weather empty data to be considered bad/ good record.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
-          .toBoolean)
-    val SORT_SCOPE =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
-        .doc("Property to specify sort scope.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    val BATCH_SORT_SIZE_INMB =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
-        .doc("Property to specify batch sort size in MB.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
-    val SINGLE_PASS =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
-        .doc("Property to enable/disable single_pass.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
-    val BAD_RECORD_PATH =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
-        .doc("Property to configure the bad record location.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    val GLOBAL_SORT_PARTITIONS =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
-        .doc("Property to configure the global sort partitions.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    val DATEFORMAT =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
-        .doc("Property to configure data format for date type columns.")
-        .stringConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-    val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
-      "carbon.input.segments.<database_name>.<table_name>")
-      .doc("Property to configure the list of segments to query.").stringConf
-      .createWithDefault(carbonProperties
-        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
-  }
-
-  /**
-   * to set the dynamic properties default values
-   */
-  def addDefaultCarbonSessionParams(): Unit = {
-    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-      carbonProperties
-        .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-          CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-        CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index dbc807d..75cc128 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.CarbonContainsWith
 import org.apache.spark.sql.CarbonEndsWith
+import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
 
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -42,7 +43,6 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
  */
 object CarbonFilters {
 
-
   /**
    * Converts data sources filters to carbon filter predicates.
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 0187489..ee2c422 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -20,10 +20,9 @@ package org.apache.spark.sql.parser
 import scala.collection.mutable
 import scala.language.implicitConversions
 
-import org.apache.spark.sql.{AnalysisException, DeleteRecords, ShowLoadsCommand, UpdateTable}
+import org.apache.spark.sql.{AnalysisException, DeleteRecords, ShowLoadsCommand, SparkSession, UpdateTable}
 import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand}
@@ -31,7 +30,10 @@ import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCo
 import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand}
 import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
 import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.CarbonOption
@@ -164,15 +166,27 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     }
 
   protected lazy val deleteRecords: Parser[LogicalPlan] =
-    (DELETE ~> FROM ~> table) ~ restInput.? <~ opt(";") ^^ {
+    (DELETE ~> FROM ~> aliasTable) ~ restInput.? <~ opt(";") ^^ {
       case table ~ rest =>
-        val tableName = getTableName(table.tableIdentifier)
-        val alias = table.alias.getOrElse("")
-        DeleteRecords("select tupleId from " + tableName + " " + alias + rest.getOrElse(""), table)
+        val tableName = getTableName(table._2)
+        val relation: LogicalPlan = table._3 match {
+          case Some(a) =>
+            DeleteRecords(
+              "select tupleId from " + tableName + " " + table._3.getOrElse("")
+                + rest.getOrElse(""),
+              Some(table._3.get),
+              table._1)
+          case None =>
+            DeleteRecords(
+              "select tupleId from " + tableName + " " + rest.getOrElse(""),
+              None,
+              table._1)
+        }
+        relation
     }
 
   protected lazy val updateTable: Parser[LogicalPlan] =
-    UPDATE ~> table ~
+    UPDATE ~> aliasTable ~
     (SET ~> "(" ~> repsep(element, ",") <~ ")") ~
     ("=" ~> restInput) <~ opt(";") ^^ {
       case tab ~ columns ~ rest =>
@@ -184,31 +198,58 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
             }
             // only list of expression are given, need to convert that list of expressions into
             // select statement on destination table
-            val relation = tab match {
-              case r@UnresolvedRelation(tableIdentifier, alias) =>
-                updateRelation(r, tableIdentifier, alias)
-              case _ => tab
+            val relation : UnresolvedRelation = tab._1 match {
+              case r@CarbonUnresolvedRelation(tableIdentifier) =>
+                tab._3 match {
+                  case Some(a) => (updateRelation(r, tableIdentifier, tab._4, Some(tab._3.get)))
+                  case None => (updateRelation(r, tableIdentifier, tab._4, None))
+                }
+              case _ => tab._1
+            }
+
+            tab._3 match {
+              case Some(a) =>
+                ("select " + sel + " from " + getTableName(tab._2) + " " + tab._3.get, relation)
+              case None =>
+                ("select " + sel + " from " + getTableName(tab._2), relation)
             }
-            ("select " + sel + " from " + getTableName(relation.tableIdentifier) + " " +
-             relation.alias.get, relation)
+
           } else {
-            (sel, updateRelation(tab, tab.tableIdentifier, tab.alias))
+            (sel, updateRelation(tab._1, tab._2, tab._4, Some(tab._3.get)))
           }
-        UpdateTable(relation, columns, selectStmt, where)
+        val rel = tab._3 match {
+          case Some(a) => UpdateTable(relation, columns, selectStmt, Some(tab._3.get), where)
+          case None => UpdateTable(relation,
+            columns,
+            selectStmt,
+            Some(tab._1.tableIdentifier.table),
+            where)
+        }
+        rel
     }
 
+
+
   private def updateRelation(
       r: UnresolvedRelation,
-      tableIdentifier: Seq[String],
+      tableIdent: Seq[String],
+      tableIdentifier: TableIdentifier,
       alias: Option[String]): UnresolvedRelation = {
     alias match {
       case Some(_) => r
       case _ =>
-        val tableAlias = tableIdentifier match {
+        val tableAlias = tableIdent match {
           case Seq(dbName, tableName) => Some(tableName)
           case Seq(tableName) => Some(tableName)
         }
-        UnresolvedRelation(tableIdentifier, tableAlias)
+        // Use Reflection to choose between Spark2.1 and Spark2.2
+        // Move UnresolvedRelation(tableIdentifier, tableAlias) to reflection.
+        val unresolvedrelation =
+        CarbonReflectionUtils.getUnresolvedRelation(
+          tableIdentifier,
+          SparkSession.getActiveSession.get.version,
+          tableAlias)
+        unresolvedrelation
     }
   }
 
@@ -219,7 +260,26 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
   protected lazy val table: Parser[UnresolvedRelation] = {
     rep1sep(attributeName, ".") ~ opt(ident) ^^ {
-      case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
+      case tableIdent ~ alias => UnresolvedRelation(tableIdent)
+    }
+  }
+
+  protected lazy val aliasTable: Parser[(UnresolvedRelation, List[String], Option[String],
+    TableIdentifier)] = {
+    rep1sep(attributeName, ".") ~ opt(ident) ^^ {
+      case tableIdent ~ alias =>
+
+        val tableIdentifier: TableIdentifier = toTableIdentifier(tableIdent)
+
+        // Use Reflection to choose between Spark2.1 and Spark2.2
+        // Move (UnresolvedRelation(tableIdent, alias), tableIdent, alias) to reflection.
+        val unresolvedRelation =
+        CarbonReflectionUtils.getUnresolvedRelation(
+          tableIdentifier,
+          SparkSession.getActiveSession.get.version,
+          alias)
+
+        (unresolvedRelation, tableIdent, alias, tableIdentifier)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 8e53927..ec20c49 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,17 +18,17 @@ package org.apache.spark.sql.parser
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{CarbonSession, SparkSession}
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.ParserUtils._
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCommand, PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command.{CarbonCreateTableCommand, PartitionerField, TableModel}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -40,7 +40,8 @@ import org.apache.carbondata.spark.util.CommonUtil
  */
 class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser {
 
-  val astBuilder = new CarbonSqlAstBuilder(conf)
+  val parser = new CarbonSpark2SqlParser
+  val astBuilder = CarbonReflectionUtils.getAstBuilder(conf, parser, sparkSession)
 
   private val substitutor = new VariableSubstitution(conf)
 
@@ -53,7 +54,7 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
         throw ce
       case ex =>
         try {
-          astBuilder.parser.parse(sqlText)
+          parser.parse(sqlText)
         } catch {
           case mce: MalformedCarbonCommandException =>
             throw mce
@@ -73,13 +74,11 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
   }
 }
 
-class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
-
-  val parser = new CarbonSpark2SqlParser
+class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
+  extends SparkSqlAstBuilder(conf) {
 
-  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
-    Option(ctx.query()).map(plan)
-    val fileStorage = Option(ctx.createFileFormat) match {
+  def getFileStorage(createFileFormat: CreateFileFormatContext): String = {
+    Option(createFileFormat) match {
       case Some(value) =>
         if (value.children.get(1).getText.equalsIgnoreCase("by")) {
           value.storageHandler().STRING().getSymbol.getText
@@ -89,63 +88,6 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
         }
       case _ => ""
     }
-    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
-        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
-      // TODO: implement temporary tables
-      if (temp) {
-        throw new ParseException(
-          "CREATE TEMPORARY TABLE is not supported yet. " +
-          "Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
-      }
-      if (ctx.skewSpec != null) {
-        operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
-      }
-      if (ctx.bucketSpec != null) {
-        operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
-      }
-
-      // validate schema
-      val (colsStructFields, colNames) = validateSchema(ctx, name)
-
-      val tableProperties = mutable.Map[String, String]()
-      val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
-        .getOrElse(Map.empty)
-      properties.foreach{property => tableProperties.put(property._1, property._2)}
-
-      val options = new CarbonOption(properties)
-
-      // validate streaming table property
-      validateStreamingProperty(ctx, options)
-
-      // validate partition clause
-      val (partitionByStructFields, partitionFields) =
-        validateParitionFields(ctx, colNames, tableProperties)
-
-      val fields = parser.getFields(colsStructFields ++ partitionByStructFields)
-
-      // validate bucket fields
-      val bucketFields: Option[BucketFields] =
-        parser.getBucketFields(tableProperties, fields, options)
-
-      val tableComment = Option(ctx.STRING()).map(string)
-
-      // prepare table model of the collected tokens
-      val tableModel: TableModel = parser.prepareTableModel(
-        ifNotExists,
-        convertDbNameToLowerCase(name.database),
-        name.table.toLowerCase,
-        fields,
-        partitionFields,
-        tableProperties,
-        bucketFields,
-        isAlterFlow = false,
-        tableComment)
-
-      CarbonCreateTableCommand(tableModel)
-    } else {
-      super.visitCreateTable(ctx)
-    }
   }
 
   /**
@@ -154,17 +96,24 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
    * @param dbName
    * @return Option of String
    */
-  protected def convertDbNameToLowerCase(dbName: Option[String]): Option[String] = {
+  def convertDbNameToLowerCase(dbName: Option[String]): Option[String] = {
     dbName match {
       case Some(databaseName) => Some(databaseName.toLowerCase)
       case None => dbName
     }
   }
 
+
+
+  def needToConvertToLowerCase(key: String): Boolean = {
+    val noConvertList = Array("LIST_INFO", "RANGE_INFO")
+    !noConvertList.exists(x => x.equalsIgnoreCase(key));
+  }
+
   /**
    * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
    */
-  private def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
+  def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
     val props = visitTablePropertyList(ctx)
     val badKeys = props.filter { case (_, v) => v == null }.keys
     if (badKeys.nonEmpty) {
@@ -180,52 +129,93 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
     }
   }
 
-  private def needToConvertToLowerCase(key: String): Boolean = {
-    val noConvertList = Array("LIST_INFO", "RANGE_INFO")
-    !noConvertList.exists(x => x.equalsIgnoreCase(key))
+  def getPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String]
+  = {
+    Option(ctx).map(visitPropertyKeyValues)
+      .getOrElse(Map.empty)
   }
 
-  private def validateParitionFields(
-      ctx: CreateTableContext,
-      colNames: Seq[String],
-      tableProperties: mutable.Map[String, String]): (Seq[StructField], Seq[PartitionerField]) = {
-    val partitionByStructFields = Option(ctx.partitionColumns).toSeq.flatMap(visitColTypeList)
-    val partitionerFields = partitionByStructFields.map { structField =>
-      PartitionerField(structField.name, Some(structField.dataType.toString), null)
+  def createCarbonTable(tableHeader: CreateTableHeaderContext,
+      skewSpecContext: SkewSpecContext,
+      bucketSpecContext: BucketSpecContext,
+      partitionColumns: ColTypeListContext,
+      columns : ColTypeListContext,
+      tablePropertyList : TablePropertyListContext,
+      tableComment : Option[String]) : LogicalPlan = {
+    // val parser = new CarbonSpark2SqlParser
+
+    val (name, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
+    // TODO: implement temporary tables
+    if (temp) {
+      throw new ParseException(
+        "CREATE TEMPORARY TABLE is not supported yet. " +
+        "Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader)
     }
-    if (partitionerFields.nonEmpty) {
-      if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
-        throw new MalformedCarbonCommandException("Error: Invalid partition definition")
-      }
-      // partition columns should not be part of the schema
-      val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
-      if (badPartCols.nonEmpty) {
-        operationNotAllowed(s"Partition columns should not be specified in the schema: " +
-                            badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
-      }
+    if (skewSpecContext != null) {
+      operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext)
+    }
+    if (bucketSpecContext != null) {
+      operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext)
     }
-    (partitionByStructFields, partitionerFields)
-  }
 
-  private def validateSchema(
-      ctx: CreateTableContext,
-      name: TableIdentifier): (Seq[StructField], Seq[String]) = {
-    // Validate schema, ensuring whether no duplicate name is used in table definition
-    val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
+
+    val cols = Option(columns).toSeq.flatMap(visitColTypeList)
+    val properties = getPropertyKeyValues(tablePropertyList)
+
+    // Ensuring whether no duplicate name is used in table definition
     val colNames = cols.map(_.name)
     if (colNames.length != colNames.distinct.length) {
       val duplicateColumns = colNames.groupBy(identity).collect {
         case (x, ys) if ys.length > 1 => "\"" + x + "\""
       }
       operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
-                          duplicateColumns.mkString("[", ",", "]"), ctx)
+                          duplicateColumns.mkString("[", ",", "]"), columns)
     }
-    (cols, colNames)
+
+
+
+    val tableProperties = mutable.Map[String, String]()
+    properties.foreach{property => tableProperties.put(property._1, property._2)}
+
+    // validate partition clause
+    val (partitionByStructFields, partitionFields) =
+      validateParitionFields(partitionColumns, colNames, tableProperties)
+
+    // validate partition clause
+    if (partitionFields.nonEmpty) {
+      if (!CommonUtil.validatePartitionColumns(tableProperties, partitionFields)) {
+        throw new MalformedCarbonCommandException("Error: Invalid partition definition")
+      }
+      // partition columns should not be part of the schema
+      val badPartCols = partitionFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
+      if (badPartCols.nonEmpty) {
+        operationNotAllowed(s"Partition columns should not be specified in the schema: " +
+                            badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"),
+          partitionColumns)
+      }
+    }
+    val fields = parser.getFields(cols ++ partitionByStructFields)
+    val options = new CarbonOption(properties)
+    // validate tblProperties
+    val bucketFields = parser.getBucketFields(tableProperties, fields, options)
+
+    validateStreamingProperty(options)
+
+    // prepare table model of the collected tokens
+    val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
+      convertDbNameToLowerCase(name.database),
+      name.table.toLowerCase,
+      fields,
+      partitionFields,
+      tableProperties,
+      bucketFields,
+      isAlterFlow = false,
+      tableComment)
+
+    CarbonCreateTableCommand(tableModel)
   }
 
-  private def validateStreamingProperty(
-      ctx: CreateTableContext,
-      carbonOption: CarbonOption): Unit = {
+  private def validateStreamingProperty(carbonOption: CarbonOption): Unit = {
     try {
       carbonOption.isStreaming
     } catch {
@@ -234,4 +224,34 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
           "Table property 'streaming' should be either 'true' or 'false'")
     }
   }
+
+  private def validateParitionFields(
+      partitionColumns: ColTypeListContext,
+      colNames: Seq[String],
+      tableProperties: mutable.Map[String, String]): (Seq[StructField], Seq[PartitionerField]) = {
+    val partitionByStructFields = Option(partitionColumns).toSeq.flatMap(visitColTypeList)
+    val partitionerFields = partitionByStructFields.map { structField =>
+      PartitionerField(structField.name, Some(structField.dataType.toString), null)
+    }
+    if (partitionerFields.nonEmpty) {
+      if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
+        throw new MalformedCarbonCommandException("Error: Invalid partition definition")
+      }
+      // partition columns should not be part of the schema
+      val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
+      if (badPartCols.nonEmpty) {
+        operationNotAllowed(s"Partition columns should not be specified in the schema: " +
+                            badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]")
+          , partitionColumns: ColTypeListContext)
+      }
+    }
+    (partitionByStructFields, partitionerFields)
+  }
+
+}
+
+trait CarbonAstTrait {
+  def getFileStorage (createFileFormat : CreateFileFormatContext): String
 }
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 07491d1..a1742de 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -23,9 +23,9 @@ import scala.collection.mutable.ListBuffer
 
 import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -130,8 +130,7 @@ object AlterTableUtil {
    */
   def updateSchemaInfo(carbonTable: CarbonTable,
       schemaEvolutionEntry: SchemaEvolutionEntry,
-      thriftTable: TableInfo)(sparkSession: SparkSession,
-      sessionState: CarbonSessionState): Unit = {
+      thriftTable: TableInfo)(sparkSession: SparkSession): Unit = {
     val dbName = carbonTable.getDatabaseName
     val tableName = carbonTable.getTableName
     CarbonEnv.getInstance(sparkSession).carbonMetastore
@@ -145,8 +144,9 @@ object AlterTableUtil {
     val schema = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(tableIdentifier)(sparkSession).schema.json
     val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
-    sessionState.metadataHive.runSqlHive(
-      s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
+    val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
+      .asInstanceOf[HiveExternalCatalog].client
+    hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
     sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
   }
 
@@ -329,7 +329,7 @@ object AlterTableUtil {
    */
   def modifyTableComment(tableIdentifier: TableIdentifier, properties: Map[String, String],
                          propKeys: Seq[String], set: Boolean)
-                        (sparkSession: SparkSession, sessionState: CarbonSessionState): Unit = {
+                        (sparkSession: SparkSession): Unit = {
     val tableName = tableIdentifier.table
     val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
     LOGGER.audit(s"Alter table comment request has been received for $dbName.$tableName")
@@ -380,7 +380,7 @@ object AlterTableUtil {
       }
       updateSchemaInfo(carbonTable,
         schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
-        thriftTable)(sparkSession, sessionState)
+        thriftTable)(sparkSession)
       LOGGER.info(s"Alter table comment is successful for table $dbName.$tableName")
       LOGGER.audit(s"Alter table comment is successful for table $dbName.$tableName")
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
new file mode 100644
index 0000000..837b21f
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * To initialize dynamic values default param
+ */
+class CarbonSQLConf(sparkSession: SparkSession) {
+
+  val carbonProperties = CarbonProperties.getInstance()
+
+  /**
+   * To initialize dynamic param defaults along with usage docs
+   */
+  def addDefaultCarbonParams(): Unit = {
+    val ENABLE_UNSAFE_SORT =
+        SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
+        .doc("To enable/ disable unsafe sort.")
+        .booleanConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
+      SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
+        .doc("To enable/ disable carbon custom block distribution.")
+        .booleanConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+            CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+    val BAD_RECORDS_LOGGER_ENABLE =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
+        .doc("To enable/ disable carbon bad record logger.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants
+          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    val BAD_RECORDS_ACTION =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
+        .doc("To configure the bad records action.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    val IS_EMPTY_DATA_BAD_RECORD =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
+        .doc("Property to decide weather empty data to be considered bad/ good record.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
+          .toBoolean)
+    val SORT_SCOPE =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
+        .doc("Property to specify sort scope.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    val BATCH_SORT_SIZE_INMB =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
+        .doc("Property to specify batch sort size in MB.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    val SINGLE_PASS =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
+        .doc("Property to enable/disable single_pass.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    val BAD_RECORD_PATH =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
+        .doc("Property to configure the bad record location.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    val GLOBAL_SORT_PARTITIONS =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
+        .doc("Property to configure the global sort partitions.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    val DATEFORMAT =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+        .doc("Property to configure data format for date type columns.")
+        .stringConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+    val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
+      "carbon.input.segments.<database_name>.<table_name>")
+      .doc("Property to configure the list of segments to query.").stringConf
+      .createWithDefault(carbonProperties
+        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
+  }
+  /**
+   * to set the dynamic properties default values
+   */
+  def addDefaultCarbonSessionParams(): Unit = {
+    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+      carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+        CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
new file mode 100644
index 0000000..eadae6a
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.parser.ParserUtils._
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
+import org.apache.spark.sql.execution.command.datamap.{DataMapDropTablePostListener, DropDataMapPostListener}
+import org.apache.spark.sql.execution.command.preaaggregate._
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events._
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonSessionCatalog(
+    externalCatalog: HiveExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
+    sparkSession: SparkSession,
+    functionResourceLoader: FunctionResourceLoader,
+    functionRegistry: FunctionRegistry,
+    conf: SQLConf,
+    hadoopConf: Configuration)
+  extends HiveSessionCatalog(
+    externalCatalog,
+    globalTempViewManager,
+    sparkSession,
+    functionResourceLoader,
+    functionRegistry,
+    conf,
+    hadoopConf) {
+
+  lazy val carbonEnv = {
+    val env = new CarbonEnv
+    env.init(sparkSession)
+    env
+  }
+
+  /**
+   * This method will invalidate carbonrelation from cache if carbon table is updated in
+   * carbon catalog
+   *
+   * @param name
+   * @param alias
+   * @return
+   */
+  override def lookupRelation(name: TableIdentifier,
+      alias: Option[String]): LogicalPlan = {
+    val rtnRelation = super.lookupRelation(name, alias)
+    var toRefreshRelation = false
+    rtnRelation match {
+      case SubqueryAlias(_,
+      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) =>
+        toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
+      case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+        toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
+      case _ =>
+    }
+
+    if (toRefreshRelation) {
+      super.lookupRelation(name, alias)
+    } else {
+      rtnRelation
+    }
+  }
+
+  private def refreshRelationFromCache(identifier: TableIdentifier,
+      alias: Option[String],
+      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
+    var isRefreshed = false
+    val storePath = CarbonProperties.getStorePath
+    carbonEnv.carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables()
+
+    val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
+      carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
+      carbonDatasourceHadoopRelation.carbonTable.getTableName)
+    if (table.isEmpty || (table.isDefined &&
+        table.get.getTableLastUpdatedTime !=
+          carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
+      refreshTable(identifier)
+      DataMapStoreManager.getInstance().
+        clearDataMaps(AbsoluteTableIdentifier.from(storePath,
+          identifier.database.getOrElse("default"), identifier.table))
+      isRefreshed = true
+      logInfo(s"Schema changes have been detected for table: $identifier")
+    }
+    isRefreshed
+  }
+}
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ * @param sparkSession
+ */
+class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
+
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+  experimentalMethods.extraStrategies =
+    Seq(
+      new StreamingTableStrategy(sparkSession),
+      new CarbonLateDecodeStrategy,
+      new DDLStrategy(sparkSession)
+    )
+  experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+
+  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+  override lazy val analyzer: Analyzer = {
+    new Analyzer(catalog, conf) {
+      override val extendedResolutionRules =
+        catalog.ParquetConversions ::
+        catalog.OrcConversions ::
+        CarbonPreInsertionCasts(sparkSession) ::
+        CarbonPreAggregateQueryRules(sparkSession) ::
+        CarbonIUDAnalysisRule(sparkSession) ::
+        AnalyzeCreateTable(sparkSession) ::
+        PreprocessTableInsertion(conf) ::
+        DataSourceAnalysis(conf) ::
+        (if (conf.runSQLonFile) {
+          new ResolveDataSource(sparkSession) :: Nil
+        } else {  Nil }
+           )
+
+      override val extendedCheckRules = Seq(
+        PreWriteCheck(conf, catalog))
+    }
+  }
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  override lazy val catalog = {
+    new CarbonSessionCatalog(
+      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+      sparkSession.sharedState.globalTempViewManager,
+      sparkSession,
+      functionResourceLoader,
+      functionRegistry,
+      conf,
+      newHadoopConf())
+  }
+}
+
+class CarbonOptimizer(
+    catalog: SessionCatalog,
+    conf: SQLConf,
+    experimentalMethods: ExperimentalMethods)
+  extends SparkOptimizer(catalog, conf, experimentalMethods) {
+
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
+    // optimize whole plan at once.
+    val transFormedPlan = plan.transform {
+      case filter: Filter =>
+        filter.transformExpressions {
+          case s: ScalarSubquery =>
+            val tPlan = s.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            ScalarSubquery(tPlan, s.children, s.exprId)
+          case p: PredicateSubquery =>
+            val tPlan = p.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
+        }
+    }
+    super.execute(transFormedPlan)
+  }
+}
+
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends
+  SparkSqlAstBuilder(conf) {
+
+  val helper = new CarbonHelperSqlAstBuilder(conf, parser)
+
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
+
+    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+      helper.createCarbonTable(ctx.createTableHeader,
+          ctx.skewSpec,
+          ctx.bucketSpec,
+          ctx.partitionColumns,
+          ctx.columns,
+          ctx.tablePropertyList,
+          Option(ctx.STRING()).map(string))
+    } else {
+      super.visitCreateTable(ctx)
+    }
+  }
+}