You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/20 15:09:37 UTC

carbondata git commit: [CARBONDATA-2312]Support In Memory Catalog

Repository: carbondata
Updated Branches:
  refs/heads/master a9d5e9dec -> e6d03d112


[CARBONDATA-2312]Support In Memory Catalog

Support Storing Catalog in memory(not in hive) for each session, after session restart user can create eternal table and run select query

This closes #2103


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e6d03d11
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e6d03d11
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e6d03d11

Branch: refs/heads/master
Commit: e6d03d11212f8c5c66052b0f6e97144cf327bea6
Parents: a9d5e9d
Author: kumarvishal <ku...@gmail.com>
Authored: Wed Apr 4 12:03:47 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Apr 20 23:09:09 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/common/util/QueryTest.scala       |   2 +-
 .../spark/util/CarbonReflectionUtils.scala      |  25 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   5 +-
 .../org/apache/spark/sql/CarbonSession.scala    |  23 +-
 .../CarbonAlterTableCompactionCommand.scala     |   5 +-
 .../CarbonAlterTableAddColumnCommand.scala      |  11 +-
 .../CarbonAlterTableDataTypeChangeCommand.scala |  11 +-
 .../CarbonAlterTableDropColumnCommand.scala     |  17 +-
 .../schema/CarbonAlterTableRenameCommand.scala  |  40 +--
 .../schema/CarbonAlterTableSetCommand.scala     |   4 +-
 .../schema/CarbonAlterTableUnsetCommand.scala   |   4 +-
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |   9 +-
 .../spark/sql/hive/CarbonSessionCatalog.scala   |  56 +++-
 .../org/apache/spark/util/AlterTableUtil.scala  |  27 +-
 .../spark/sql/hive/CarbonSessionState.scala     |  50 +++-
 .../apache/spark/sql/hive/CarbonAnalyzer.scala  |  34 +++
 .../sql/hive/CarbonInMemorySessionState.scala   | 276 +++++++++++++++++++
 .../apache/spark/sql/hive/CarbonOptimizer.scala |  77 ++++++
 .../spark/sql/hive/CarbonSessionState.scala     | 231 ++++------------
 .../spark/sql/hive/CarbonSqlAstBuilder.scala    | 124 +++++++++
 20 files changed, 777 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 9c5bc38..d45e759 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -28,7 +28,7 @@ import scala.collection.JavaConversions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.command.LoadDataCommand
-import org.apache.spark.sql.hive.{CarbonSessionCatalog, HiveExternalCatalog}
+import org.apache.spark.sql.hive.{CarbonSessionCatalog}
 import org.apache.spark.sql.test.{ResourceRegisterAndCopier, TestQueryExecutor}
 import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SQLContext}
 import org.scalatest.Suite

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 69eb021..4264aa1 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -194,19 +194,30 @@ object CarbonReflectionUtils {
     }
   }
 
-  def getSessionState(sparkContext: SparkContext, carbonSession: Object): Any = {
+  def getSessionState(sparkContext: SparkContext,
+      carbonSession: Object,
+      useHiveMetaStore: Boolean): Any = {
     if (SPARK_VERSION.startsWith("2.1")) {
       val className = sparkContext.conf.get(
         CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
         "org.apache.spark.sql.hive.CarbonSessionState")
       createObject(className, carbonSession)._1
     } else if (SPARK_VERSION.startsWith("2.2")) {
-      val className = sparkContext.conf.get(
-        CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
-        "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
-      val tuple = createObject(className, carbonSession, None)
-      val method = tuple._2.getMethod("build")
-      method.invoke(tuple._1)
+      if (useHiveMetaStore) {
+        val className = sparkContext.conf.get(
+          CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
+          "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
+        val tuple = createObject(className, carbonSession, None)
+        val method = tuple._2.getMethod("build")
+        method.invoke(tuple._1)
+      } else {
+        val className = sparkContext.conf.get(
+          CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
+          "org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
+        val tuple = createObject(className, carbonSession, None)
+        val method = tuple._2.getMethod("build")
+        method.invoke(tuple._1)
+      }
     } else {
       throw new UnsupportedOperationException("Spark version not supported")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 593ecce..00e0aed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -23,6 +23,7 @@ import scala.util.Try
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
 import org.apache.spark.sql.hive.{HiveSessionCatalog, _}
@@ -123,7 +124,7 @@ object CarbonEnv {
 
   def getInstance(sparkSession: SparkSession): CarbonEnv = {
     if (sparkSession.isInstanceOf[CarbonSession]) {
-      sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getCarbonEnv()
+      sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getCarbonEnv
     } else {
       var carbonEnv: CarbonEnv = carbonEnvMap.get(sparkSession)
       if (carbonEnv == null) {
@@ -249,7 +250,7 @@ object CarbonEnv {
    */
   def getDatabaseLocation(dbName: String, sparkSession: SparkSession): String = {
     var databaseLocation =
-      sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getDatabaseMetadata(dbName)
+      sparkSession.sessionState.catalog.asInstanceOf[SessionCatalog].getDatabaseMetadata(dbName)
         .locationUri.toString
     // for default database and db ends with .db
     // check whether the carbon store and hive store is same or different.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 7ee3038..1d5a82d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -43,7 +43,8 @@ import org.apache.carbondata.streaming.CarbonStreamingQueryListener
  * User needs to use {CarbonSession.getOrCreateCarbon} to create Carbon session.
  */
 class CarbonSession(@transient val sc: SparkContext,
-    @transient private val existingSharedState: Option[SharedState]
+    @transient private val existingSharedState: Option[SharedState],
+    @transient useHiveMetaStore: Boolean = true
 ) extends SparkSession(sc) { self =>
 
   def this(sc: SparkContext) {
@@ -51,8 +52,10 @@ class CarbonSession(@transient val sc: SparkContext,
   }
 
   @transient
-  override lazy val sessionState: SessionState =
-    CarbonReflectionUtils.getSessionState(sparkContext, this).asInstanceOf[SessionState]
+  override lazy val sessionState: SessionState = {
+    CarbonReflectionUtils.getSessionState(sparkContext, this, useHiveMetaStore)
+      .asInstanceOf[SessionState]
+  }
 
   /**
    * State shared across sessions, including the `SparkContext`, cached data, listener,
@@ -74,7 +77,7 @@ class CarbonSession(@transient val sc: SparkContext,
   }
 
   override def newSession(): SparkSession = {
-    new CarbonSession(sparkContext, Some(sharedState))
+    new CarbonSession(sparkContext, Some(sharedState), useHiveMetaStore)
   }
 
   override def sql(sqlText: String): DataFrame = {
@@ -116,10 +119,16 @@ object CarbonSession {
 
   private val statementId = new AtomicLong(0)
 
+  private var enableInMemCatlog: Boolean = false
+
   private[sql] val threadStatementId = new ThreadLocal[Long]()
 
   implicit class CarbonBuilder(builder: Builder) {
 
+    def enableInMemoryCatalog(): Builder = {
+      enableInMemCatlog = true
+      builder
+    }
     def getOrCreateCarbonSession(): SparkSession = {
       getOrCreateCarbonSession(null, null)
     }
@@ -132,7 +141,9 @@ object CarbonSession {
 
     def getOrCreateCarbonSession(storePath: String,
         metaStorePath: String): SparkSession = synchronized {
-      builder.enableHiveSupport()
+      if (!enableInMemCatlog) {
+        builder.enableHiveSupport()
+      }
       val options =
         getValue("options", builder).asInstanceOf[scala.collection.mutable.HashMap[String, String]]
       val userSuppliedContext: Option[SparkContext] =
@@ -205,7 +216,7 @@ object CarbonSession {
           sc
         }
 
-        session = new CarbonSession(sparkContext)
+        session = new CarbonSession(sparkContext, None, !enableInMemCatlog)
         val carbonProperties = CarbonProperties.getInstance()
         if (storePath != null) {
           carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index a7b5f7e..c462c9e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
+import org.apache.spark.sql.hive.{CarbonRelation}
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.AlterTableUtil
@@ -299,8 +299,7 @@ case class CarbonAlterTableCompactionCommand(
           tableIdentifier,
           Map("streaming" -> "false"),
           Seq.empty,
-          true)(sparkSession,
-          sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+          true)(sparkSession)
         // 5. remove checkpoint
         FileFactory.deleteAllFilesOfDir(
           new File(CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 4962c3a..d33fc6d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -85,11 +85,14 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       schemaEvolutionEntry.setAdded(newCols.toList.asJava)
       val thriftTable = schemaConverter
         .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
-      AlterTableUtil
-        .updateSchemaInfo(carbonTable,
+      val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
+          carbonTable,
           schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
-          thriftTable)(sparkSession,
-          sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+          thriftTable,
+          Some(newCols))(sparkSession)
+      sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterAddColumns(
+        tableIdentifier, schemaParts, cols)
+      sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent =
         new AlterTableAddColumnPostEvent(sparkSession,
           carbonTable, alterTableAddColumnsModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index ff17cfd..accaa27 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -26,6 +26,7 @@ import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus}
@@ -96,9 +97,13 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
       schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
       tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
         .setTime_stamp(System.currentTimeMillis)
-      AlterTableUtil.updateSchemaInfo(
-        carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession,
-          sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+      val schemaConverter = new ThriftWrapperSchemaConverterImpl
+      val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
+      val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
+        carbonTable, schemaEvolutionEntry, tableInfo, Some(a))(sparkSession)
+      sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+        .alterColumnChangeDataType(tableIdentifier, schemaParts, cols)
+      sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       val alterTablePostExecutionEvent: AlterTableDataTypeChangePostEvent =
         new AlterTableDataTypeChangePostEvent(sparkSession, carbonTable,
           alterTableDataTypeChangeModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index a64bdb9..ff1541b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -27,9 +27,9 @@ import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableDropColumnPostEvent, AlterTableDropColumnPreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.SchemaEvolutionEntry
 import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
@@ -116,9 +116,18 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       timeStamp = System.currentTimeMillis
       val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
       schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
-      AlterTableUtil
-        .updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession,
-          sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+      val schemaConverter = new ThriftWrapperSchemaConverterImpl
+      val delCols = deletedColumnSchema.map { deleteCols =>
+        schemaConverter.fromExternalToWrapperColumnSchema(deleteCols)
+      }
+      val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
+        carbonTable,
+        schemaEvolutionEntry,
+        tableInfo,
+        Some(delCols))(sparkSession)
+      sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+        .alterDropColumns(tableIdentifier, schemaParts, cols)
+      sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       // TODO: 1. add check for deletion of index tables
       // delete dictionary files for dictionary column and clear dictionary cache from memory
       new AlterTableDropColumnRDD(sparkSession.sparkContext,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index e349e93..af52d6b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -125,26 +125,21 @@ private[sql] case class CarbonAlterTableRenameCommand(
       val fileType = FileFactory.getFileType(tableMetadataFile)
       val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
         newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
+      val oldIdentifier = TableIdentifier(oldTableName, Some(oldDatabaseName))
+      val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
       var newTablePath = CarbonTablePath.getNewTablePath(
         oldTableIdentifier.getTablePath, newTableIdentifier.getTableName)
       metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
-      val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
-        .getClient()
       var partitions: Seq[CatalogTablePartition] = Seq.empty
       if (carbonTable.isHivePartitionTable) {
         partitions =
-          sparkSession.sessionState.catalog.listPartitions(
-            TableIdentifier(oldTableName, Some(oldDatabaseName)))
+          sparkSession.sessionState.catalog.listPartitions(oldIdentifier)
       }
-      sparkSession.catalog.refreshTable(TableIdentifier(oldTableName,
-        Some(oldDatabaseName)).quotedString)
-      hiveClient.runSqlHive(
-          s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
-      hiveClient.runSqlHive(
-          s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
-          s"('tableName'='$newTableName', " +
-          s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
-
+      sparkSession.catalog.refreshTable(oldIdentifier.quotedString)
+      sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterTableRename(
+          oldIdentifier,
+          newIdentifier,
+          newTablePath)
       // changed the rename order to deal with situation when carbon table and hive table
       // will point to the same tablePath
       if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
@@ -160,16 +155,19 @@ private[sql] case class CarbonAlterTableRenameCommand(
         partitions,
         oldTableIdentifier.getTablePath,
         newTablePath,
-        sparkSession)
+        sparkSession,
+        newIdentifier.table,
+        oldDatabaseName)
 
-      val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
       val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(newIdentifier)
       // Update the storage location with new path
       sparkSession.sessionState.catalog.alterTable(
         catalogTable.copy(storage = sparkSession.sessionState.catalog.
           asInstanceOf[CarbonSessionCatalog].updateStorageLocation(
           new Path(newTablePath),
-          catalogTable.storage)))
+          catalogTable.storage,
+          newIdentifier.table,
+          oldDatabaseName)))
       if (updatedParts.nonEmpty) {
         // Update the new updated partitions specs with new location.
         sparkSession.sessionState.catalog.alterPartitions(
@@ -232,14 +230,20 @@ private[sql] case class CarbonAlterTableRenameCommand(
       partitions: Seq[CatalogTablePartition],
       oldTablePath: String,
       newTablePath: String,
-      sparkSession: SparkSession): Seq[CatalogTablePartition] = {
+      sparkSession: SparkSession,
+      newTableName: String,
+      dbName: String): Seq[CatalogTablePartition] = {
     partitions.map{ part =>
       if (part.storage.locationUri.isDefined) {
         val path = new Path(part.location)
         if (path.toString.contains(oldTablePath)) {
           val newPath = new Path(path.toString.replace(oldTablePath, newTablePath))
           part.copy(storage = sparkSession.sessionState.catalog.
-            asInstanceOf[CarbonSessionCatalog].updateStorageLocation(newPath, part.storage))
+            asInstanceOf[CarbonSessionCatalog].updateStorageLocation(
+              newPath,
+              part.storage,
+              newTableName,
+              dbName))
         } else {
           part
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
index 51c0e6e..ffd69df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command.schema
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.apache.spark.util.AlterTableUtil
 
 private[sql] case class CarbonAlterTableSetCommand(
@@ -38,8 +37,7 @@ private[sql] case class CarbonAlterTableSetCommand(
       tableIdentifier,
       properties,
       Nil,
-      set = true)(sparkSession,
-      sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+      set = true)(sparkSession)
     Seq.empty
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
index 2490f9e..d5bdd80 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command.schema
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.apache.spark.util.AlterTableUtil
 
 
@@ -37,8 +36,7 @@ private[sql] case class CarbonAlterTableUnsetCommand(
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     AlterTableUtil.modifyTableProperties(tableIdentifier, Map.empty[String, String],
-      propKeys, false)(sparkSession,
-      sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+      propKeys, false)(sparkSession)
     Seq.empty
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 96ef473..76aa73e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -18,8 +18,7 @@ package org.apache.spark.sql.hive
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{CarbonSession, SparkSession}
+import org.apache.spark.sql.{SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
@@ -168,10 +167,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     val dbName = newTableIdentifier.getDatabaseName
     val tableName = newTableIdentifier.getTableName
     val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
-    val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
-      .getClient()
-    hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)")
-
+    sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+      .alterTable(TableIdentifier(tableName, Some(dbName)), schemaParts, None)
     sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
     removeTableFromMetadata(dbName, tableName)
     CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
index 1c69309..f00739e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
@@ -63,5 +63,59 @@ trait CarbonSessionCatalog {
   /**
    * Update the storageformat with new location information
    */
-  def updateStorageLocation(path: Path, storage: CatalogStorageFormat): CatalogStorageFormat
+  def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat
+
+  /**
+   * Method used to update the table name
+   * @param oldTableIdentifier old table identifier
+   * @param newTableIdentifier new table identifier
+   * @param newTablePath new table path
+   */
+  def alterTableRename(oldTableIdentifier: TableIdentifier,
+      newTableIdentifier: TableIdentifier,
+      newTablePath: String): Unit
+
+  /**
+   * Below method will be used to update serd properties
+   * @param tableIdentifier table identifier
+   * @param schemaParts schema parts
+   * @param cols cols
+   */
+  def alterTable(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
+
+  /**
+   * Below method will be used to add new column
+   * @param tableIdentifier table identifier
+   * @param schemaParts schema parts
+   * @param cols cols
+   */
+  def alterAddColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
+
+  /**
+   * Below method will be used to drop column
+   * @param tableIdentifier table identifier
+   * @param schemaParts schema parts
+   * @param cols cols
+   */
+  def alterDropColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
+
+  /**
+   * Below method will be used to alter data type of column in schema
+   * @param tableIdentifier table identifier
+   * @param schemaParts schema parts
+   * @param cols cols
+   */
+  def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 1c6756b..ac2bf9f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
-import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -36,9 +35,7 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 
 object AlterTableUtil {
@@ -127,11 +124,16 @@ object AlterTableUtil {
    * @param schemaEvolutionEntry
    * @param thriftTable
    * @param sparkSession
-   * @param catalog
    */
   def updateSchemaInfo(carbonTable: CarbonTable,
       schemaEvolutionEntry: SchemaEvolutionEntry,
-      thriftTable: TableInfo)(sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = {
+      thriftTable: TableInfo,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]] =
+      None)
+    (sparkSession: SparkSession):
+    (TableIdentifier,
+      String,
+      Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) = {
     val dbName = carbonTable.getDatabaseName
     val tableName = carbonTable.getTableName
     CarbonEnv.getInstance(sparkSession).carbonMetastore
@@ -145,9 +147,7 @@ object AlterTableUtil {
     val schema = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(tableIdentifier)(sparkSession).schema.json
     val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
-    val hiveClient = catalog.getClient();
-    hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
-    sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+    (tableIdentifier, schemaParts, cols)
   }
 
   /**
@@ -306,11 +306,10 @@ object AlterTableUtil {
    * @param propKeys
    * @param set
    * @param sparkSession
-   * @param catalog
    */
   def modifyTableProperties(tableIdentifier: TableIdentifier, properties: Map[String, String],
       propKeys: Seq[String], set: Boolean)
-    (sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = {
+    (sparkSession: SparkSession): Unit = {
     val tableName = tableIdentifier.table
     val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
     LOGGER.audit(s"Alter table properties request has been received for $dbName.$tableName")
@@ -360,10 +359,12 @@ object AlterTableUtil {
           }
         }
       }
-
-      updateSchemaInfo(carbonTable,
+      val (tableIdentifier, schemParts, cols) = updateSchemaInfo(carbonTable,
         schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
-        thriftTable)(sparkSession, catalog)
+        thriftTable)(sparkSession)
+      sparkSession.asInstanceOf[CarbonSession].sessionState.catalog
+        .asInstanceOf[CarbonSessionCatalog].alterTable(tableIdentifier, schemParts, cols)
+      sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       LOGGER.info(s"Alter table properties is successful for table $dbName.$tableName")
       LOGGER.audit(s"Alter table properties is successful for table $dbName.$tableName")
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index c37e6fa..9fe7241 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -72,7 +72,7 @@ class CarbonHiveSessionCatalog(
     conf,
     hadoopConf) with CarbonSessionCatalog {
 
-  lazy val carbonEnv = {
+  private lazy val carbonEnv = {
     val env = new CarbonEnv
     env.init(sparkSession)
     env
@@ -84,6 +84,50 @@ class CarbonHiveSessionCatalog(
   override def getCarbonEnv() : CarbonEnv = {
     carbonEnv
   }
+
+  def alterTableRename(oldTableIdentifier: TableIdentifier,
+      newTableIdentifier: TableIdentifier,
+      newTablePath: String): Unit = {
+    getClient().runSqlHive(
+      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table }" +
+      s" RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
+    getClient().runSqlHive(
+      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }" +
+      s" SET SERDEPROPERTIES" +
+      s"('tableName'='${ newTableIdentifier.table }', " +
+      s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
+  }
+
+  def alterTable(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    getClient()
+      .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${tableIdentifier.table } " +
+                  s"SET TBLPROPERTIES(${ schemaParts })")
+  }
+
+  def alterAddColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  def alterDropColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
   // Initialize all listeners to the Operation bus.
   CarbonEnv.init(sparkSession)
 
@@ -195,7 +239,9 @@ class CarbonHiveSessionCatalog(
    */
   override def updateStorageLocation(
       path: Path,
-      storage: CatalogStorageFormat): CatalogStorageFormat = {
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat = {
     storage.copy(locationUri = Some(path.toString))
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
new file mode 100644
index 0000000..88beb68
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.internal.SQLConf
+
+class CarbonAnalyzer(catalog: SessionCatalog,
+    conf: SQLConf,
+    sparkSession: SparkSession,
+    analyzer: Analyzer) extends Analyzer(catalog, conf) {
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    var logicalPlan = analyzer.execute(plan)
+    logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
+    CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
new file mode 100644
index 0000000..e8ab84a
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class InMemorySessionCatalog(
+    externalCatalog: ExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
+    functionRegistry: FunctionRegistry,
+    sparkSession: SparkSession,
+    conf: SQLConf,
+    hadoopConf: Configuration,
+    parser: ParserInterface,
+    functionResourceLoader: FunctionResourceLoader)
+  extends SessionCatalog(
+    externalCatalog,
+    globalTempViewManager,
+    functionRegistry,
+    conf,
+    hadoopConf,
+    parser,
+    functionResourceLoader
+  ) with CarbonSessionCatalog {
+
+  override def alterTableRename(oldTableIdentifier: TableIdentifier,
+      newTableIdentifier: TableIdentifier,
+      newTablePath: String): Unit = {
+    sparkSession.sessionState.catalog.renameTable(oldTableIdentifier, newTableIdentifier)
+  }
+
+  override def alterTable(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    // NOt Required in case of In-memory catalog
+  }
+
+  override def alterAddColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      newColumns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+    val structType = catalogTable.schema
+    var newStructType = structType
+    newColumns.get.foreach {cols =>
+      newStructType = structType
+        .add(cols.getColumnName, CarbonScalaUtil.convertCarbonToSparkDataType(cols.getDataType))
+    }
+    alterSchema(newStructType, catalogTable, tableIdentifier)
+  }
+
+  override def alterDropColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      dropCols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+    val fields = catalogTable.schema.fields.filterNot { field =>
+      dropCols.get.exists { col =>
+        col.getColumnName.equalsIgnoreCase(field.name)
+      }
+    }
+    alterSchema(new StructType(fields), catalogTable, tableIdentifier)
+  }
+
+  override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      columns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+    val a = catalogTable.schema.fields.flatMap { field =>
+      columns.get.map { col =>
+        if (col.getColumnName.equalsIgnoreCase(field.name)) {
+          StructField(col.getColumnName,
+            CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType))
+        } else {
+          field
+        }
+      }
+    }
+    alterSchema(new StructType(a), catalogTable, tableIdentifier)
+  }
+
+  private def alterSchema(structType: StructType,
+      catalogTable: CatalogTable,
+      tableIdentifier: TableIdentifier): Unit = {
+    val copy = catalogTable.copy(schema = structType)
+    sparkSession.sessionState.catalog.alterTable(copy)
+    sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
+  }
+
+  lazy val carbonEnv = {
+    val env = new CarbonEnv
+    env.init(sparkSession)
+    env
+  }
+
+  def getCarbonEnv() : CarbonEnv = {
+    carbonEnv
+  }
+
+  // Initialize all listeners to the Operation bus.
+  CarbonEnv.initListeners()
+
+  def getThriftTableInfo(tablePath: String): TableInfo = {
+    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
+    CarbonUtil.readSchemaFile(tableMetadataFile)
+  }
+
+  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
+    val rtnRelation = super.lookupRelation(name)
+    val isRelationRefreshed =
+      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
+    if (isRelationRefreshed) {
+      super.lookupRelation(name)
+    } else {
+      rtnRelation
+    }
+  }
+
+  /**
+   * returns hive client from HiveExternalCatalog
+   *
+   * @return
+   */
+  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+    null
+  }
+
+  override def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    try {
+      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+      super.createPartitions(tableName, updatedParts, ignoreIfExists)
+    } catch {
+      case e: Exception =>
+        super.createPartitions(tableName, parts, ignoreIfExists)
+    }
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier) = {
+    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
+  }
+
+  /**
+   * Update the storageformat with new location information
+   */
+  override def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat = {
+    storage.copy(locationUri = Some(path.toUri))
+  }
+}
+
+class CarbonInMemorySessionStateBuilder (sparkSession: SparkSession,
+    parentState: Option[SessionState] = None)
+  extends SessionStateBuilder(sparkSession, parentState) {
+
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+  experimentalMethods.extraStrategies =
+    Seq(new StreamingTableStrategy(sparkSession),
+      new CarbonLateDecodeStrategy,
+      new DDLStrategy(sparkSession)
+    )
+  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
+    new CarbonUDFTransformRule,
+    new CarbonLateDecodeRule)
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  override protected lazy val catalog: InMemorySessionCatalog = {
+    val catalog = new InMemorySessionCatalog(
+      externalCatalog,
+      session.sharedState.globalTempViewManager,
+      functionRegistry,
+      sparkSession,
+      conf,
+      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+      sqlParser,
+      resourceLoader)
+    parentState.foreach(_.catalog.copyStateTo(catalog))
+    catalog
+  }
+
+  private def externalCatalog: ExternalCatalog =
+    session.sharedState.externalCatalog.asInstanceOf[ExternalCatalog]
+
+  override protected lazy val resourceLoader: SessionResourceLoader = {
+    new SessionResourceLoader(session)
+  }
+
+  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+  override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
+    new Analyzer(catalog, conf) {
+      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+        new FindDataSourceTable(session) +:
+        new ResolveSQLOnFile(session) +:
+        new CarbonIUDAnalysisRule(sparkSession) +:
+        new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
+      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+        PreWriteCheck :: HiveOnlyCheck :: Nil
+      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+        PreprocessTableCreation(session) +:
+        PreprocessTableInsertion(conf) +:
+        DataSourceAnalysis(conf) +:
+        customPostHocResolutionRules
+    }
+  )
+  override protected def newBuilder: NewBuilder = new CarbonInMemorySessionStateBuilder(_, _)
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
new file mode 100644
index 0000000..a046763
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, ExperimentalMethods}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions.{Exists, In, ListQuery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+
+
+class CarbonOptimizer(
+    catalog: SessionCatalog,
+    conf: SQLConf,
+    experimentalMethods: ExperimentalMethods)
+  extends SparkOptimizer(catalog, conf, experimentalMethods) {
+
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
+    super.execute(transFormedPlan)
+  }
+}
+
+object CarbonOptimizerUtil {
+  def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = {
+    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
+    // optimize whole plan at once.
+    val transFormedPlan = plan.transform {
+      case filter: Filter =>
+        filter.transformExpressions {
+          case s: ScalarSubquery =>
+            val tPlan = s.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            ScalarSubquery(tPlan, s.children, s.exprId)
+          case e: Exists =>
+            val tPlan = e.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
+
+          case In(value, Seq(l@ListQuery(sub, _, exprId))) =>
+            val tPlan = sub.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            In(value, Seq(ListQuery(tPlan, l.children, exprId)))
+        }
+    }
+    transFormedPlan
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index 479c9ce..de37a35 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -22,29 +22,18 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
-import org.apache.spark.sql.execution.command.table.CarbonShowTablesCommand
-import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
+import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.{SQLConf, SessionState}
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
-import org.apache.spark.sql.types.DecimalType
-import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
@@ -79,7 +68,7 @@ class CarbonHiveSessionCatalog(
     functionResourceLoader
   ) with CarbonSessionCatalog {
 
-  lazy val carbonEnv = {
+  private lazy val carbonEnv = {
     val env = new CarbonEnv
     env.init(sparkSession)
     env
@@ -95,9 +84,6 @@ class CarbonHiveSessionCatalog(
   // Initialize all listeners to the Operation bus.
   CarbonEnv.initListeners()
 
-
-
-
   override def lookupRelation(name: TableIdentifier): LogicalPlan = {
     val rtnRelation = super.lookupRelation(name)
     val isRelationRefreshed =
@@ -119,6 +105,49 @@ class CarbonHiveSessionCatalog(
       .asInstanceOf[HiveExternalCatalog].client
   }
 
+  def alterTableRename(oldTableIdentifier: TableIdentifier,
+      newTableIdentifier: TableIdentifier,
+      newTablePath: String): Unit = {
+    getClient().runSqlHive(
+      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
+      s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
+    getClient().runSqlHive(
+      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " +
+      s"SET SERDEPROPERTIES" +
+      s"('tableName'='${ newTableIdentifier.table }', " +
+      s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
+  }
+
+  override def alterTable(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    getClient()
+      .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " +
+                  s"SET TBLPROPERTIES(${ schemaParts })")
+  }
+
+  override def alterAddColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  override def alterDropColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
+  override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+  : Unit = {
+    alterTable(tableIdentifier, schemaParts, cols)
+  }
+
   override def createPartitions(
       tableName: TableIdentifier,
       parts: Seq[CatalogTablePartition],
@@ -152,24 +181,13 @@ class CarbonHiveSessionCatalog(
    */
   override def updateStorageLocation(
       path: Path,
-      storage: CatalogStorageFormat): CatalogStorageFormat = {
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat = {
     storage.copy(locationUri = Some(path.toUri))
   }
 }
 
-
-class CarbonAnalyzer(catalog: SessionCatalog,
-    conf: SQLConf,
-    sparkSession: SparkSession,
-    analyzer: Analyzer) extends Analyzer(catalog, conf) {
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    var logicalPlan = analyzer.execute(plan)
-    logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
-    CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
-  }
-}
-
-
 /**
  * Session state implementation to override sql parser and adding strategies
  *
@@ -194,7 +212,7 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
    * Internal catalog for managing table and database states.
    */
   /**
-   * Create a [[CarbonSessionCatalogBuild]].
+   * Create a [[CarbonSessionStateBuilder]].
    */
   override protected lazy val catalog: CarbonHiveSessionCatalog = {
     val catalog = new CarbonHiveSessionCatalog(
@@ -248,147 +266,4 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
   )
 
   override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
-
-}
-
-
-class CarbonOptimizer(
-    catalog: SessionCatalog,
-    conf: SQLConf,
-    experimentalMethods: ExperimentalMethods)
-  extends SparkOptimizer(catalog, conf, experimentalMethods) {
-
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
-    super.execute(transFormedPlan)
-  }
-}
-
-object CarbonOptimizerUtil {
-  def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = {
-    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
-    // optimize whole plan at once.
-    val transFormedPlan = plan.transform {
-      case filter: Filter =>
-        filter.transformExpressions {
-          case s: ScalarSubquery =>
-            val tPlan = s.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            ScalarSubquery(tPlan, s.children, s.exprId)
-          case e: Exists =>
-            val tPlan = e.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
-
-          case In(value, Seq(l@ListQuery(sub, _, exprId))) =>
-            val tPlan = sub.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            In(value, Seq(ListQuery(tPlan, l.children, exprId)))
-        }
-    }
-    transFormedPlan
-  }
-}
-
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
-  extends SparkSqlAstBuilder(conf) {
-
-  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
-
-  override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
-    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
-
-    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
-        fileStorage.equalsIgnoreCase("carbondata") ||
-        fileStorage.equalsIgnoreCase("'carbonfile'") ||
-        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
-        ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(),
-        Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage)
-      helper.createCarbonTable(createTableTuple)
-    } else {
-      super.visitCreateHiveTable(ctx)
-    }
-  }
-
-  override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
-
-    val newColumn = visitColType(ctx.colType)
-    if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
-      throw new MalformedCarbonCommandException(
-        "Column names provided are different. Both the column names should be same")
-    }
-
-    val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
-      case d:DecimalType => ("decimal", Some(List((d.precision, d.scale))))
-      case _ => (newColumn.dataType.typeName.toLowerCase, None)
-    }
-
-    val alterTableChangeDataTypeModel =
-      AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
-        new CarbonSpark2SqlParser()
-          .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
-        ctx.tableIdentifier().table.getText.toLowerCase,
-        ctx.identifier.getText.toLowerCase,
-        newColumn.name.toLowerCase)
-
-    CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
-  }
-
-
-  override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-
-    val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
-    val fields = parser.getFields(cols)
-    val tblProperties = scala.collection.mutable.Map.empty[String, String]
-    val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false,
-      new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
-        .map(_.getText)),
-      ctx.tableIdentifier.table.getText.toLowerCase,
-      fields,
-      Seq.empty,
-      tblProperties,
-      None,
-      true)
-
-    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
-      Option(ctx.tableIdentifier().db).map(_.getText),
-      ctx.tableIdentifier.table.getText,
-      tblProperties.toMap,
-      tableModel.dimCols,
-      tableModel.msrCols,
-      tableModel.highcardinalitydims.getOrElse(Seq.empty))
-
-    CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
-  }
-
-  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
-    super.visitCreateTable(ctx)
-  }
-
-  override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
-    withOrigin(ctx) {
-      if (CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
-          CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
-        super.visitShowTables(ctx)
-      } else {
-        CarbonShowTablesCommand(
-          Option(ctx.db).map(_.getText),
-          Option(ctx.pattern).map(string))
-      }
-    }
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
new file mode 100644
index 0000000..b0702ae
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
+import org.apache.spark.sql.execution.command.table.CarbonShowTablesCommand
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser}
+import org.apache.spark.sql.types.DecimalType
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+  extends SparkSqlAstBuilder(conf) {
+
+  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
+
+  override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
+    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
+
+    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+        fileStorage.equalsIgnoreCase("carbondata") ||
+        fileStorage.equalsIgnoreCase("'carbonfile'") ||
+        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
+        ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(),
+        Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage)
+      helper.createCarbonTable(createTableTuple)
+    } else {
+      super.visitCreateHiveTable(ctx)
+    }
+  }
+
+  override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
+
+    val newColumn = visitColType(ctx.colType)
+    if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
+      throw new MalformedCarbonCommandException(
+        "Column names provided are different. Both the column names should be same")
+    }
+
+    val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
+      case d:DecimalType => ("decimal", Some(List((d.precision, d.scale))))
+      case _ => (newColumn.dataType.typeName.toLowerCase, None)
+    }
+
+    val alterTableChangeDataTypeModel =
+      AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
+        new CarbonSpark2SqlParser()
+          .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
+        ctx.tableIdentifier().table.getText.toLowerCase,
+        ctx.identifier.getText.toLowerCase,
+        newColumn.name.toLowerCase)
+
+    CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
+  }
+
+
+  override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
+    val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
+    val fields = parser.getFields(cols)
+    val tblProperties = scala.collection.mutable.Map.empty[String, String]
+    val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false,
+      new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
+        .map(_.getText)),
+      ctx.tableIdentifier.table.getText.toLowerCase,
+      fields,
+      Seq.empty,
+      tblProperties,
+      None,
+      true)
+
+    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+      Option(ctx.tableIdentifier().db).map(_.getText),
+      ctx.tableIdentifier.table.getText,
+      tblProperties.toMap,
+      tableModel.dimCols,
+      tableModel.msrCols,
+      tableModel.highcardinalitydims.getOrElse(Seq.empty))
+
+    CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
+  }
+
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+    super.visitCreateTable(ctx)
+  }
+
+  override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
+    withOrigin(ctx) {
+      if (CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
+          CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
+        super.visitShowTables(ctx)
+      } else {
+        CarbonShowTablesCommand(
+          Option(ctx.db).map(_.getText),
+          Option(ctx.pattern).map(string))
+      }
+    }
+  }
+}