You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/05 15:02:27 UTC

[09/50] [abbrv] carbondata git commit: [CARBONDATA-1987] Make package name and directory paths consistent; remove duplicate file CarbonColumnValidator

[CARBONDATA-1987] Make package name and directory paths consistent;remove duplicate file CarbonColumnValidator

add coveralls token to spark-2.2 profile;synchronize file path and package name;Delete duplicate class CarbonColumnValidator present in spark2 module

This closes #1764


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4d3f3989
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4d3f3989
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4d3f3989

Branch: refs/heads/fgdatamap
Commit: 4d3f3989b5b3aef6ed44e3c67c4102bea4505013
Parents: 94011c3
Author: Raghunandan S <ca...@gmail.com>
Authored: Thu Jan 4 20:18:07 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Jan 31 08:19:22 2018 +0530

----------------------------------------------------------------------
 .../spark/CarbonColumnValidator.scala           |  36 --
 .../src/main/spark2.1/CarbonSQLConf.scala       | 149 -------
 .../src/main/spark2.1/CarbonSessionState.scala  | 339 ----------------
 .../apache/spark/sql/hive/CarbonSQLConf.scala   | 149 +++++++
 .../spark/sql/hive/CarbonSessionState.scala     | 339 ++++++++++++++++
 .../src/main/spark2.2/CarbonSessionState.scala  | 398 -------------------
 .../src/main/spark2.2/CarbonSqlConf.scala       | 148 -------
 .../spark/sql/hive/CarbonSessionState.scala     | 398 +++++++++++++++++++
 .../apache/spark/sql/hive/CarbonSqlConf.scala   | 148 +++++++
 pom.xml                                         |   8 +-
 10 files changed, 1039 insertions(+), 1073 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
deleted file mode 100644
index 03c4764..0000000
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark
-
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
- /**
-  * Carbon column validator
-  */
-class CarbonColumnValidator extends ColumnValidator {
-  def validateColumns(allColumns: Seq[ColumnSchema]): Unit = {
-    allColumns.foreach { columnSchema =>
-      val colWithSameId = allColumns.filter { x =>
-        x.getColumnUniqueId.equals(columnSchema.getColumnUniqueId)
-      }
-      if (colWithSameId.size > 1) {
-        throw new MalformedCarbonCommandException("Two column can not have same columnId")
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
deleted file mode 100644
index 15ccb0c..0000000
--- a/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * To initialize dynamic values default param
- */
-class CarbonSQLConf(sparkSession: SparkSession) {
-
-  val carbonProperties = CarbonProperties.getInstance()
-
-  /**
-   * To initialize dynamic param defaults along with usage docs
-   */
-  def addDefaultCarbonParams(): Unit = {
-    val ENABLE_UNSAFE_SORT =
-        SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
-        .doc("To enable/ disable unsafe sort.")
-        .booleanConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
-      SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
-        .doc("To set carbon task distribution.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
-            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
-    val BAD_RECORDS_LOGGER_ENABLE =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
-        .doc("To enable/ disable carbon bad record logger.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants
-          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    val BAD_RECORDS_ACTION =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
-        .doc("To configure the bad records action.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    val IS_EMPTY_DATA_BAD_RECORD =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
-        .doc("Property to decide weather empty data to be considered bad/ good record.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
-          .toBoolean)
-    val SORT_SCOPE =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
-        .doc("Property to specify sort scope.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    val BATCH_SORT_SIZE_INMB =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
-        .doc("Property to specify batch sort size in MB.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
-    val SINGLE_PASS =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
-        .doc("Property to enable/disable single_pass.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
-    val BAD_RECORD_PATH =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
-        .doc("Property to configure the bad record location.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    val GLOBAL_SORT_PARTITIONS =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
-        .doc("Property to configure the global sort partitions.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    val DATEFORMAT =
-      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
-        .doc("Property to configure data format for date type columns.")
-        .stringConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-    val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
-      "carbon.input.segments.<database_name>.<table_name>")
-      .doc("Property to configure the list of segments to query.").stringConf
-      .createWithDefault(carbonProperties
-        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
-  }
-  /**
-   * to set the dynamic properties default values
-   */
-  def addDefaultCarbonSessionParams(): Unit = {
-    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-      carbonProperties
-        .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
-          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-        CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
deleted file mode 100644
index 0fe0f96..0000000
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.parser.ParserUtils._
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy}
-
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonSessionCatalog(
-    externalCatalog: HiveExternalCatalog,
-    globalTempViewManager: GlobalTempViewManager,
-    sparkSession: SparkSession,
-    functionResourceLoader: FunctionResourceLoader,
-    functionRegistry: FunctionRegistry,
-    conf: SQLConf,
-    hadoopConf: Configuration)
-  extends HiveSessionCatalog(
-    externalCatalog,
-    globalTempViewManager,
-    sparkSession,
-    functionResourceLoader,
-    functionRegistry,
-    conf,
-    hadoopConf) {
-
-  lazy val carbonEnv = {
-    val env = new CarbonEnv
-    env.init(sparkSession)
-    env
-  }
-
-  // Initialize all listeners to the Operation bus.
-  CarbonEnv.initListeners()
-
-  /**
-   * This method will invalidate carbonrelation from cache if carbon table is updated in
-   * carbon catalog
-   *
-   * @param name
-   * @param alias
-   * @return
-   */
-  override def lookupRelation(name: TableIdentifier,
-      alias: Option[String]): LogicalPlan = {
-    val rtnRelation = super.lookupRelation(name, alias)
-    var toRefreshRelation = false
-    rtnRelation match {
-      case SubqueryAlias(_,
-      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) =>
-        toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
-      case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
-        toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
-      case _ =>
-    }
-
-    if (toRefreshRelation) {
-      super.lookupRelation(name, alias)
-    } else {
-      rtnRelation
-    }
-  }
-
-  private def refreshRelationFromCache(identifier: TableIdentifier,
-      alias: Option[String],
-      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
-    var isRefreshed = false
-    val storePath = CarbonProperties.getStorePath
-    carbonEnv.carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTable(identifier)
-
-    val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
-      carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
-      carbonDatasourceHadoopRelation.carbonTable.getTableName)
-    if (table.isEmpty || (table.isDefined &&
-        table.get.getTableLastUpdatedTime !=
-          carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
-      refreshTable(identifier)
-      DataMapStoreManager.getInstance().
-        clearDataMaps(AbsoluteTableIdentifier.from(storePath,
-          identifier.database.getOrElse("default"), identifier.table))
-      isRefreshed = true
-      logInfo(s"Schema changes have been detected for table: $identifier")
-    }
-    isRefreshed
-  }
-
-  /**
-   * returns hive client from session state
-   *
-   * @return
-   */
-  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
-    sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
-  }
-
-  override def createPartitions(
-      tableName: TableIdentifier,
-      parts: Seq[CatalogTablePartition],
-      ignoreIfExists: Boolean): Unit = {
-    try {
-      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
-      // Get the properties from thread local
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
-      if (carbonSessionInfo != null) {
-        val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
-        super.createPartitions(tableName, updatedParts, ignoreIfExists)
-      } else {
-        super.createPartitions(tableName, parts, ignoreIfExists)
-      }
-    } catch {
-      case e: Exception =>
-        super.createPartitions(tableName, parts, ignoreIfExists)
-    }
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  def getPartitionsAlternate(
-      partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier) = {
-    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
-    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier)
-    val partitionSchema = catalogTable.partitionSchema
-    if (partitionFilters.nonEmpty) {
-      val boundPredicate =
-        InterpretedPredicate.create(partitionFilters.reduce(And).transform {
-          case att: AttributeReference =>
-            val index = partitionSchema.indexWhere(_.name == att.name)
-            BoundReference(index, partitionSchema(index).dataType, nullable = true)
-        })
-      allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
-    } else {
-      allPartitions
-    }
-  }
-}
-
-/**
- * Session state implementation to override sql parser and adding strategies
- * @param sparkSession
- */
-class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
-
-  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
-  experimentalMethods.extraStrategies = extraStrategies
-
-  experimentalMethods.extraOptimizations = extraOptimizations
-
-  def extraStrategies: Seq[Strategy] = {
-    Seq(
-      new StreamingTableStrategy(sparkSession),
-      new CarbonLateDecodeStrategy,
-      new DDLStrategy(sparkSession)
-    )
-  }
-
-  def extraOptimizations: Seq[Rule[LogicalPlan]] = {
-    Seq(new CarbonIUDRule,
-      new CarbonUDFTransformRule,
-      new CarbonLateDecodeRule)
-  }
-
-  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
-  def extendedAnalyzerRules: Seq[Rule[LogicalPlan]] = Nil
-  def internalAnalyzerRules: Seq[Rule[LogicalPlan]] = {
-    catalog.ParquetConversions ::
-    catalog.OrcConversions ::
-    CarbonPreInsertionCasts(sparkSession) ::
-    CarbonIUDAnalysisRule(sparkSession) ::
-    AnalyzeCreateTable(sparkSession) ::
-    PreprocessTableInsertion(conf) ::
-    DataSourceAnalysis(conf) ::
-    (if (conf.runSQLonFile) {
-      new ResolveDataSource(sparkSession) :: Nil
-    } else {  Nil })
-  }
-
-  override lazy val analyzer: Analyzer =
-    new CarbonAnalyzer(catalog, conf, sparkSession,
-      new Analyzer(catalog, conf) {
-        override val extendedResolutionRules =
-          if (extendedAnalyzerRules.nonEmpty) {
-            extendedAnalyzerRules ++ internalAnalyzerRules
-          } else {
-            internalAnalyzerRules
-          }
-        override val extendedCheckRules = Seq(
-          PreWriteCheck(conf, catalog))
-      }
-  )
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  override lazy val catalog = {
-    new CarbonSessionCatalog(
-      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
-      sparkSession.sharedState.globalTempViewManager,
-      sparkSession,
-      functionResourceLoader,
-      functionRegistry,
-      conf,
-      newHadoopConf())
-  }
-}
-
-class CarbonAnalyzer(catalog: SessionCatalog,
-    conf: CatalystConf,
-    sparkSession: SparkSession,
-    analyzer: Analyzer) extends Analyzer(catalog, conf) {
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    var logicalPlan = analyzer.execute(plan)
-    logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
-    CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
-  }
-}
-
-class CarbonOptimizer(
-    catalog: SessionCatalog,
-    conf: SQLConf,
-    experimentalMethods: ExperimentalMethods)
-  extends SparkOptimizer(catalog, conf, experimentalMethods) {
-
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
-    super.execute(transFormedPlan)
-  }
-}
-
-object CarbonOptimizerUtil {
-  def transformForScalarSubQuery(plan: LogicalPlan) : LogicalPlan = {
-    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule,
-    // And optimize whole plan at once.
-    val transFormedPlan = plan.transform {
-      case filter: Filter =>
-        filter.transformExpressions {
-          case s: ScalarSubquery =>
-            val tPlan = s.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            ScalarSubquery(tPlan, s.children, s.exprId)
-          case p: PredicateSubquery =>
-            val tPlan = p.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
-        }
-    }
-    transFormedPlan
-  }
-}
-
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
-  extends SparkSqlAstBuilder(conf) {
-
-  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
-
-  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
-    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
-
-    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
-        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      helper.createCarbonTable(
-        tableHeader = ctx.createTableHeader,
-        skewSpecContext = ctx.skewSpec,
-        bucketSpecContext = ctx.bucketSpec,
-        partitionColumns = ctx.partitionColumns,
-        columns = ctx.columns,
-        tablePropertyList = ctx.tablePropertyList,
-        locationSpecContext = ctx.locationSpec(),
-        tableComment = Option(ctx.STRING()).map(string),
-        ctas = ctx.AS,
-        query = ctx.query)
-    } else {
-      super.visitCreateTable(ctx)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
new file mode 100644
index 0000000..15ccb0c
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * To initialize dynamic values default param
+ */
+class CarbonSQLConf(sparkSession: SparkSession) {
+
+  val carbonProperties = CarbonProperties.getInstance()
+
+  /**
+   * To initialize dynamic param defaults along with usage docs
+   */
+  def addDefaultCarbonParams(): Unit = {
+    val ENABLE_UNSAFE_SORT =
+        SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
+        .doc("To enable/ disable unsafe sort.")
+        .booleanConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
+      SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
+        .doc("To set carbon task distribution.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+    val BAD_RECORDS_LOGGER_ENABLE =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
+        .doc("To enable/ disable carbon bad record logger.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants
+          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    val BAD_RECORDS_ACTION =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
+        .doc("To configure the bad records action.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    val IS_EMPTY_DATA_BAD_RECORD =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
+        .doc("Property to decide weather empty data to be considered bad/ good record.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
+          .toBoolean)
+    val SORT_SCOPE =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
+        .doc("Property to specify sort scope.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    val BATCH_SORT_SIZE_INMB =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
+        .doc("Property to specify batch sort size in MB.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    val SINGLE_PASS =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
+        .doc("Property to enable/disable single_pass.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    val BAD_RECORD_PATH =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
+        .doc("Property to configure the bad record location.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    val GLOBAL_SORT_PARTITIONS =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
+        .doc("Property to configure the global sort partitions.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    val DATEFORMAT =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+        .doc("Property to configure data format for date type columns.")
+        .stringConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+    val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
+      "carbon.input.segments.<database_name>.<table_name>")
+      .doc("Property to configure the list of segments to query.").stringConf
+      .createWithDefault(carbonProperties
+        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
+  }
+  /**
+   * to set the dynamic properties default values
+   */
+  def addDefaultCarbonSessionParams(): Unit = {
+    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+      carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+        CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
new file mode 100644
index 0000000..0fe0f96
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.parser.ParserUtils._
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy}
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonSessionCatalog(
+    externalCatalog: HiveExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
+    sparkSession: SparkSession,
+    functionResourceLoader: FunctionResourceLoader,
+    functionRegistry: FunctionRegistry,
+    conf: SQLConf,
+    hadoopConf: Configuration)
+  extends HiveSessionCatalog(
+    externalCatalog,
+    globalTempViewManager,
+    sparkSession,
+    functionResourceLoader,
+    functionRegistry,
+    conf,
+    hadoopConf) {
+
+  lazy val carbonEnv = {
+    val env = new CarbonEnv
+    env.init(sparkSession)
+    env
+  }
+
+  // Initialize all listeners to the Operation bus.
+  CarbonEnv.initListeners()
+
+  /**
+   * This method will invalidate carbonrelation from cache if carbon table is updated in
+   * carbon catalog
+   *
+   * @param name
+   * @param alias
+   * @return
+   */
+  override def lookupRelation(name: TableIdentifier,
+      alias: Option[String]): LogicalPlan = {
+    val rtnRelation = super.lookupRelation(name, alias)
+    var toRefreshRelation = false
+    rtnRelation match {
+      case SubqueryAlias(_,
+      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) =>
+        toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
+      case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+        toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
+      case _ =>
+    }
+
+    if (toRefreshRelation) {
+      super.lookupRelation(name, alias)
+    } else {
+      rtnRelation
+    }
+  }
+
+  private def refreshRelationFromCache(identifier: TableIdentifier,
+      alias: Option[String],
+      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
+    var isRefreshed = false
+    val storePath = CarbonProperties.getStorePath
+    carbonEnv.carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTable(identifier)
+
+    val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
+      carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
+      carbonDatasourceHadoopRelation.carbonTable.getTableName)
+    if (table.isEmpty || (table.isDefined &&
+        table.get.getTableLastUpdatedTime !=
+          carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
+      refreshTable(identifier)
+      DataMapStoreManager.getInstance().
+        clearDataMaps(AbsoluteTableIdentifier.from(storePath,
+          identifier.database.getOrElse("default"), identifier.table))
+      isRefreshed = true
+      logInfo(s"Schema changes have been detected for table: $identifier")
+    }
+    isRefreshed
+  }
+
+  /**
+   * returns hive client from session state
+   *
+   * @return
+   */
+  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+    sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+  }
+
+  override def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    try {
+      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+      // Get the properties from thread local
+      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      if (carbonSessionInfo != null) {
+        val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
+        super.createPartitions(tableName, updatedParts, ignoreIfExists)
+      } else {
+        super.createPartitions(tableName, parts, ignoreIfExists)
+      }
+    } catch {
+      case e: Exception =>
+        super.createPartitions(tableName, parts, ignoreIfExists)
+    }
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  def getPartitionsAlternate(
+      partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier) = {
+    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier)
+    val partitionSchema = catalogTable.partitionSchema
+    if (partitionFilters.nonEmpty) {
+      val boundPredicate =
+        InterpretedPredicate.create(partitionFilters.reduce(And).transform {
+          case att: AttributeReference =>
+            val index = partitionSchema.indexWhere(_.name == att.name)
+            BoundReference(index, partitionSchema(index).dataType, nullable = true)
+        })
+      allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
+    } else {
+      allPartitions
+    }
+  }
+}
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ * @param sparkSession
+ */
+class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
+
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+  experimentalMethods.extraStrategies = extraStrategies
+
+  experimentalMethods.extraOptimizations = extraOptimizations
+
+  def extraStrategies: Seq[Strategy] = {
+    Seq(
+      new StreamingTableStrategy(sparkSession),
+      new CarbonLateDecodeStrategy,
+      new DDLStrategy(sparkSession)
+    )
+  }
+
+  def extraOptimizations: Seq[Rule[LogicalPlan]] = {
+    Seq(new CarbonIUDRule,
+      new CarbonUDFTransformRule,
+      new CarbonLateDecodeRule)
+  }
+
+  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+  def extendedAnalyzerRules: Seq[Rule[LogicalPlan]] = Nil
+  def internalAnalyzerRules: Seq[Rule[LogicalPlan]] = {
+    catalog.ParquetConversions ::
+    catalog.OrcConversions ::
+    CarbonPreInsertionCasts(sparkSession) ::
+    CarbonIUDAnalysisRule(sparkSession) ::
+    AnalyzeCreateTable(sparkSession) ::
+    PreprocessTableInsertion(conf) ::
+    DataSourceAnalysis(conf) ::
+    (if (conf.runSQLonFile) {
+      new ResolveDataSource(sparkSession) :: Nil
+    } else {  Nil })
+  }
+
+  override lazy val analyzer: Analyzer =
+    new CarbonAnalyzer(catalog, conf, sparkSession,
+      new Analyzer(catalog, conf) {
+        override val extendedResolutionRules =
+          if (extendedAnalyzerRules.nonEmpty) {
+            extendedAnalyzerRules ++ internalAnalyzerRules
+          } else {
+            internalAnalyzerRules
+          }
+        override val extendedCheckRules = Seq(
+          PreWriteCheck(conf, catalog))
+      }
+  )
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  override lazy val catalog = {
+    new CarbonSessionCatalog(
+      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+      sparkSession.sharedState.globalTempViewManager,
+      sparkSession,
+      functionResourceLoader,
+      functionRegistry,
+      conf,
+      newHadoopConf())
+  }
+}
+
+class CarbonAnalyzer(catalog: SessionCatalog,
+    conf: CatalystConf,
+    sparkSession: SparkSession,
+    analyzer: Analyzer) extends Analyzer(catalog, conf) {
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    var logicalPlan = analyzer.execute(plan)
+    logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
+    CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
+  }
+}
+
+class CarbonOptimizer(
+    catalog: SessionCatalog,
+    conf: SQLConf,
+    experimentalMethods: ExperimentalMethods)
+  extends SparkOptimizer(catalog, conf, experimentalMethods) {
+
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
+    super.execute(transFormedPlan)
+  }
+}
+
+object CarbonOptimizerUtil {
+  def transformForScalarSubQuery(plan: LogicalPlan) : LogicalPlan = {
+    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule,
+    // And optimize whole plan at once.
+    val transFormedPlan = plan.transform {
+      case filter: Filter =>
+        filter.transformExpressions {
+          case s: ScalarSubquery =>
+            val tPlan = s.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            ScalarSubquery(tPlan, s.children, s.exprId)
+          case p: PredicateSubquery =>
+            val tPlan = p.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
+        }
+    }
+    transFormedPlan
+  }
+}
+
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+  extends SparkSqlAstBuilder(conf) {
+
+  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
+
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
+
+    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+      helper.createCarbonTable(
+        tableHeader = ctx.createTableHeader,
+        skewSpecContext = ctx.skewSpec,
+        bucketSpecContext = ctx.bucketSpec,
+        partitionColumns = ctx.partitionColumns,
+        columns = ctx.columns,
+        tablePropertyList = ctx.tablePropertyList,
+        locationSpecContext = ctx.locationSpec(),
+        tableComment = Option(ctx.STRING()).map(string),
+        ctas = ctx.AS,
+        query = ctx.query)
+    } else {
+      super.visitCreateTable(ctx)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
deleted file mode 100644
index 3c151f0..0000000
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-
-import scala.collection.generic.SeqFactory
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.parser.ParserUtils.string
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
-import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
-import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SQLConf, SessionState}
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
-import org.apache.spark.sql.types.DecimalType
-import org.apache.spark.util.CarbonReflectionUtils
-
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonSessionCatalog(
-    externalCatalog: HiveExternalCatalog,
-    globalTempViewManager: GlobalTempViewManager,
-    functionRegistry: FunctionRegistry,
-    sparkSession: SparkSession,
-    conf: SQLConf,
-    hadoopConf: Configuration,
-    parser: ParserInterface,
-    functionResourceLoader: FunctionResourceLoader)
-  extends HiveSessionCatalog(
-    externalCatalog,
-    globalTempViewManager,
-    new HiveMetastoreCatalog(sparkSession),
-    functionRegistry,
-    conf,
-    hadoopConf,
-    parser,
-    functionResourceLoader
-  ) {
-
-  lazy val carbonEnv = {
-    val env = new CarbonEnv
-    env.init(sparkSession)
-    env
-  }
-
-  def getCarbonEnv() : CarbonEnv = {
-    carbonEnv
-  }
-
-  // Initialize all listeners to the Operation bus.
-  CarbonEnv.initListeners()
-
-
-
-
-  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
-    val rtnRelation = super.lookupRelation(name)
-    var toRefreshRelation = false
-    rtnRelation match {
-      case SubqueryAlias(_,
-      LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
-        toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
-      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
-        toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
-      case SubqueryAlias(_, relation) if
-      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
-      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
-      relation.getClass.getName.equals(
-        "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") =>
-        val catalogTable =
-          CarbonReflectionUtils.getFieldOfCatalogTable(
-            "tableMeta",
-            relation).asInstanceOf[CatalogTable]
-        toRefreshRelation =
-          CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
-      case _ =>
-    }
-
-    if (toRefreshRelation) {
-      super.lookupRelation(name)
-    } else {
-      rtnRelation
-    }
-  }
-
-  /**
-   * returns hive client from HiveExternalCatalog
-   *
-   * @return
-   */
-  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
-    sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
-      .asInstanceOf[HiveExternalCatalog].client
-  }
-
-  override def createPartitions(
-      tableName: TableIdentifier,
-      parts: Seq[CatalogTablePartition],
-      ignoreIfExists: Boolean): Unit = {
-    try {
-      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
-      // Get the properties from thread local
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
-      if (carbonSessionInfo != null) {
-        val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
-        super.createPartitions(tableName, updatedParts, ignoreIfExists)
-      } else {
-        super.createPartitions(tableName, parts, ignoreIfExists)
-      }
-    } catch {
-      case e: Exception =>
-        super.createPartitions(tableName, parts, ignoreIfExists)
-    }
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  def getPartitionsAlternate(partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier) = {
-    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
-    ExternalCatalogUtils.prunePartitionsByFilter(
-      sparkSession.sessionState.catalog.getTableMetadata(identifier),
-      allPartitions,
-      partitionFilters,
-      sparkSession.sessionState.conf.sessionLocalTimeZone)
-  }
-}
-
-
-class CarbonAnalyzer(catalog: SessionCatalog,
-    conf: SQLConf,
-    sparkSession: SparkSession,
-    analyzer: Analyzer) extends Analyzer(catalog, conf) {
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    var logicalPlan = analyzer.execute(plan)
-    logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
-    CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
-  }
-}
-
-
-/**
- * Session state implementation to override sql parser and adding strategies
- *
- * @param sparkSession
- */
-class CarbonSessionStateBuilder(sparkSession: SparkSession,
-    parentState: Option[SessionState] = None)
-  extends HiveSessionStateBuilder(sparkSession, parentState) {
-
-  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
-  experimentalMethods.extraStrategies =
-    Seq(new StreamingTableStrategy(sparkSession),
-        new CarbonLateDecodeStrategy,
-        new DDLStrategy(sparkSession)
-    )
-  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
-    new CarbonUDFTransformRule,
-    new CarbonLateDecodeRule)
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  /**
-   * Create a [[CarbonSessionCatalogBuild]].
-   */
-  override protected lazy val catalog: CarbonSessionCatalog = {
-    val catalog = new CarbonSessionCatalog(
-      externalCatalog,
-      session.sharedState.globalTempViewManager,
-      functionRegistry,
-      sparkSession,
-      conf,
-      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
-      sqlParser,
-      resourceLoader)
-    parentState.foreach(_.catalog.copyStateTo(catalog))
-    catalog
-  }
-
-  private def externalCatalog: HiveExternalCatalog =
-    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
-
-  /**
-   * Create a Hive aware resource loader.
-   */
-  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
-    val client: HiveClient = externalCatalog.client.newSession()
-    new HiveSessionResourceLoader(session, client)
-  }
-
-  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
-  override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
-    new Analyzer(catalog, conf) {
-
-      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
-        new ResolveHiveSerdeTable(session) +:
-        new FindDataSourceTable(session) +:
-        new ResolveSQLOnFile(session) +:
-        new CarbonIUDAnalysisRule(sparkSession) +:
-        new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
-
-      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
-      PreWriteCheck :: HiveOnlyCheck :: Nil
-
-      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
-        new DetermineTableStats(session) +:
-        RelationConversions(conf, catalog) +:
-        PreprocessTableCreation(session) +:
-        PreprocessTableInsertion(conf) +:
-        DataSourceAnalysis(conf) +:
-        HiveAnalysis +:
-        customPostHocResolutionRules
-    }
-  )
-
-  override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
-
-}
-
-
-class CarbonOptimizer(
-    catalog: SessionCatalog,
-    conf: SQLConf,
-    experimentalMethods: ExperimentalMethods)
-  extends SparkOptimizer(catalog, conf, experimentalMethods) {
-
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
-    // optimize whole plan at once.
-    val transFormedPlan = plan.transform {
-      case filter: Filter =>
-        filter.transformExpressions {
-          case s: ScalarSubquery =>
-            val tPlan = s.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            ScalarSubquery(tPlan, s.children, s.exprId)
-          case e: Exists =>
-            val tPlan = e.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
-
-          case In(value, Seq(l@ListQuery(sub, _, exprId))) =>
-            val tPlan = sub.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            In(value, Seq(ListQuery(tPlan, l.children , exprId)))
-        }
-    }
-    super.execute(transFormedPlan)
-  }
-}
-
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
-  extends SparkSqlAstBuilder(conf) {
-
-  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
-
-  override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
-    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
-
-    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
-        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      helper.createCarbonTable(
-        tableHeader = ctx.createTableHeader,
-        skewSpecContext = ctx.skewSpec,
-        bucketSpecContext = ctx.bucketSpec,
-        partitionColumns = ctx.partitionColumns,
-        columns = ctx.columns,
-        tablePropertyList = ctx.tablePropertyList,
-        locationSpecContext = ctx.locationSpec(),
-        tableComment = Option(ctx.STRING()).map(string),
-        ctas = ctx.AS,
-        query = ctx.query)
-    } else {
-      super.visitCreateHiveTable(ctx)
-    }
-  }
-
-  override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
-
-    val newColumn = visitColType(ctx.colType)
-    if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
-      throw new MalformedCarbonCommandException(
-        "Column names provided are different. Both the column names should be same")
-    }
-
-    val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
-      case d:DecimalType => ("decimal", Some(List((d.precision, d.scale))))
-      case _ => (newColumn.dataType.typeName.toLowerCase, None)
-    }
-
-    val alterTableChangeDataTypeModel =
-      AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
-        new CarbonSpark2SqlParser()
-          .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
-        ctx.tableIdentifier().table.getText.toLowerCase,
-        ctx.identifier.getText.toLowerCase,
-        newColumn.name.toLowerCase)
-
-    CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
-  }
-
-
-  override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-
-    val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
-    val fields = parser.getFields(cols)
-    val tblProperties = scala.collection.mutable.Map.empty[String, String]
-    val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false,
-      new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
-        .map(_.getText)),
-      ctx.tableIdentifier.table.getText.toLowerCase,
-      fields,
-      Seq.empty,
-      tblProperties,
-      None,
-      true)
-
-    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
-      Option(ctx.tableIdentifier().db).map(_.getText),
-      ctx.tableIdentifier.table.getText,
-      tblProperties.toMap,
-      tableModel.dimCols,
-      tableModel.msrCols,
-      tableModel.highcardinalitydims.getOrElse(Seq.empty))
-
-    CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
-  }
-
-  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
-    super.visitCreateTable(ctx)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala b/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala
deleted file mode 100644
index 2128ffd..0000000
--- a/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf.buildConf
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * To initialize dynamic values default param
- */
-class CarbonSQLConf(sparkSession: SparkSession) {
-
-  val carbonProperties = CarbonProperties.getInstance()
-
-  /**
-   * To initialize dynamic param defaults along with usage docs
-   */
-  def addDefaultCarbonParams(): Unit = {
-    val ENABLE_UNSAFE_SORT =
-      buildConf(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
-        .doc("To enable/ disable unsafe sort.")
-        .booleanConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
-      buildConf(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
-        .doc("To set carbon task distribution.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
-            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
-    val BAD_RECORDS_LOGGER_ENABLE =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
-        .doc("To enable/ disable carbon bad record logger.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants
-          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    val BAD_RECORDS_ACTION =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
-        .doc("To configure the bad records action.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    val IS_EMPTY_DATA_BAD_RECORD =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
-        .doc("Property to decide weather empty data to be considered bad/ good record.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
-          .toBoolean)
-    val SORT_SCOPE =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
-        .doc("Property to specify sort scope.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    val BATCH_SORT_SIZE_INMB =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
-        .doc("Property to specify batch sort size in MB.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
-    val SINGLE_PASS =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
-        .doc("Property to enable/disable single_pass.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
-    val BAD_RECORD_PATH =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
-        .doc("Property to configure the bad record location.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    val GLOBAL_SORT_PARTITIONS =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
-        .doc("Property to configure the global sort partitions.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    val DATEFORMAT =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
-        .doc("Property to configure data format for date type columns.")
-        .stringConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-    val CARBON_INPUT_SEGMENTS = buildConf(
-      "carbon.input.segments.<database_name>.<table_name>")
-      .doc("Property to configure the list of segments to query.").stringConf
-      .createWithDefault(carbonProperties
-        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
-  }
-  /**
-   * to set the dynamic properties default values
-   */
-  def addDefaultCarbonSessionParams(): Unit = {
-    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-      carbonProperties
-        .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
-          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-        CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
new file mode 100644
index 0000000..3c151f0
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+
+import scala.collection.generic.SeqFactory
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.parser.ParserUtils.string
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
+import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.{SQLConf, SessionState}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
+import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.util.CarbonReflectionUtils
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonSessionCatalog(
+    externalCatalog: HiveExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
+    functionRegistry: FunctionRegistry,
+    sparkSession: SparkSession,
+    conf: SQLConf,
+    hadoopConf: Configuration,
+    parser: ParserInterface,
+    functionResourceLoader: FunctionResourceLoader)
+  extends HiveSessionCatalog(
+    externalCatalog,
+    globalTempViewManager,
+    new HiveMetastoreCatalog(sparkSession),
+    functionRegistry,
+    conf,
+    hadoopConf,
+    parser,
+    functionResourceLoader
+  ) {
+
+  lazy val carbonEnv = {
+    val env = new CarbonEnv
+    env.init(sparkSession)
+    env
+  }
+
+  def getCarbonEnv() : CarbonEnv = {
+    carbonEnv
+  }
+
+  // Initialize all listeners to the Operation bus.
+  CarbonEnv.initListeners()
+
+
+
+
+  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
+    val rtnRelation = super.lookupRelation(name)
+    var toRefreshRelation = false
+    rtnRelation match {
+      case SubqueryAlias(_,
+      LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
+        toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
+        toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+      case SubqueryAlias(_, relation) if
+      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+      relation.getClass.getName.equals(
+        "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") =>
+        val catalogTable =
+          CarbonReflectionUtils.getFieldOfCatalogTable(
+            "tableMeta",
+            relation).asInstanceOf[CatalogTable]
+        toRefreshRelation =
+          CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
+      case _ =>
+    }
+
+    if (toRefreshRelation) {
+      super.lookupRelation(name)
+    } else {
+      rtnRelation
+    }
+  }
+
+  /**
+   * returns hive client from HiveExternalCatalog
+   *
+   * @return
+   */
+  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+    sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
+      .asInstanceOf[HiveExternalCatalog].client
+  }
+
+  override def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    try {
+      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+      // Get the properties from thread local
+      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      if (carbonSessionInfo != null) {
+        val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
+        super.createPartitions(tableName, updatedParts, ignoreIfExists)
+      } else {
+        super.createPartitions(tableName, parts, ignoreIfExists)
+      }
+    } catch {
+      case e: Exception =>
+        super.createPartitions(tableName, parts, ignoreIfExists)
+    }
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  def getPartitionsAlternate(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier) = {
+    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
+    ExternalCatalogUtils.prunePartitionsByFilter(
+      sparkSession.sessionState.catalog.getTableMetadata(identifier),
+      allPartitions,
+      partitionFilters,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+  }
+}
+
+
+class CarbonAnalyzer(catalog: SessionCatalog,
+    conf: SQLConf,
+    sparkSession: SparkSession,
+    analyzer: Analyzer) extends Analyzer(catalog, conf) {
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    var logicalPlan = analyzer.execute(plan)
+    logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
+    CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
+  }
+}
+
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ *
+ * @param sparkSession
+ */
+class CarbonSessionStateBuilder(sparkSession: SparkSession,
+    parentState: Option[SessionState] = None)
+  extends HiveSessionStateBuilder(sparkSession, parentState) {
+
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+  experimentalMethods.extraStrategies =
+    Seq(new StreamingTableStrategy(sparkSession),
+        new CarbonLateDecodeStrategy,
+        new DDLStrategy(sparkSession)
+    )
+  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
+    new CarbonUDFTransformRule,
+    new CarbonLateDecodeRule)
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  /**
+   * Create a [[CarbonSessionCatalogBuild]].
+   */
+  override protected lazy val catalog: CarbonSessionCatalog = {
+    val catalog = new CarbonSessionCatalog(
+      externalCatalog,
+      session.sharedState.globalTempViewManager,
+      functionRegistry,
+      sparkSession,
+      conf,
+      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+      sqlParser,
+      resourceLoader)
+    parentState.foreach(_.catalog.copyStateTo(catalog))
+    catalog
+  }
+
+  private def externalCatalog: HiveExternalCatalog =
+    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+
+  /**
+   * Create a Hive aware resource loader.
+   */
+  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
+    val client: HiveClient = externalCatalog.client.newSession()
+    new HiveSessionResourceLoader(session, client)
+  }
+
+  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+  override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
+    new Analyzer(catalog, conf) {
+
+      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+        new ResolveHiveSerdeTable(session) +:
+        new FindDataSourceTable(session) +:
+        new ResolveSQLOnFile(session) +:
+        new CarbonIUDAnalysisRule(sparkSession) +:
+        new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
+
+      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+      PreWriteCheck :: HiveOnlyCheck :: Nil
+
+      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+        new DetermineTableStats(session) +:
+        RelationConversions(conf, catalog) +:
+        PreprocessTableCreation(session) +:
+        PreprocessTableInsertion(conf) +:
+        DataSourceAnalysis(conf) +:
+        HiveAnalysis +:
+        customPostHocResolutionRules
+    }
+  )
+
+  override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
+
+}
+
+
+class CarbonOptimizer(
+    catalog: SessionCatalog,
+    conf: SQLConf,
+    experimentalMethods: ExperimentalMethods)
+  extends SparkOptimizer(catalog, conf, experimentalMethods) {
+
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
+    // optimize whole plan at once.
+    val transFormedPlan = plan.transform {
+      case filter: Filter =>
+        filter.transformExpressions {
+          case s: ScalarSubquery =>
+            val tPlan = s.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            ScalarSubquery(tPlan, s.children, s.exprId)
+          case e: Exists =>
+            val tPlan = e.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
+
+          case In(value, Seq(l@ListQuery(sub, _, exprId))) =>
+            val tPlan = sub.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            In(value, Seq(ListQuery(tPlan, l.children , exprId)))
+        }
+    }
+    super.execute(transFormedPlan)
+  }
+}
+
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+  extends SparkSqlAstBuilder(conf) {
+
+  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
+
+  override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
+    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
+
+    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+      helper.createCarbonTable(
+        tableHeader = ctx.createTableHeader,
+        skewSpecContext = ctx.skewSpec,
+        bucketSpecContext = ctx.bucketSpec,
+        partitionColumns = ctx.partitionColumns,
+        columns = ctx.columns,
+        tablePropertyList = ctx.tablePropertyList,
+        locationSpecContext = ctx.locationSpec(),
+        tableComment = Option(ctx.STRING()).map(string),
+        ctas = ctx.AS,
+        query = ctx.query)
+    } else {
+      super.visitCreateHiveTable(ctx)
+    }
+  }
+
+  override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
+
+    val newColumn = visitColType(ctx.colType)
+    if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
+      throw new MalformedCarbonCommandException(
+        "Column names provided are different. Both the column names should be same")
+    }
+
+    val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
+      case d:DecimalType => ("decimal", Some(List((d.precision, d.scale))))
+      case _ => (newColumn.dataType.typeName.toLowerCase, None)
+    }
+
+    val alterTableChangeDataTypeModel =
+      AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
+        new CarbonSpark2SqlParser()
+          .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
+        ctx.tableIdentifier().table.getText.toLowerCase,
+        ctx.identifier.getText.toLowerCase,
+        newColumn.name.toLowerCase)
+
+    CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
+  }
+
+
+  override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
+
+    val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
+    val fields = parser.getFields(cols)
+    val tblProperties = scala.collection.mutable.Map.empty[String, String]
+    val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false,
+      new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
+        .map(_.getText)),
+      ctx.tableIdentifier.table.getText.toLowerCase,
+      fields,
+      Seq.empty,
+      tblProperties,
+      None,
+      true)
+
+    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+      Option(ctx.tableIdentifier().db).map(_.getText),
+      ctx.tableIdentifier.table.getText,
+      tblProperties.toMap,
+      tableModel.dimCols,
+      tableModel.msrCols,
+      tableModel.highcardinalitydims.getOrElse(Seq.empty))
+
+    CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
+  }
+
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+    super.visitCreateTable(ctx)
+  }
+}