You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/14 17:50:35 UTC

[13/18] carbondata git commit: [CARBONDATA-1573] [Integration] Support Database Location Configuration while Creating Database/ Support Creation of carbon Table in the database location

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 2671aad..c7db436 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -70,7 +70,6 @@ case class CarbonDictionaryDecoder(
 
   override def doExecute(): RDD[InternalRow] = {
     attachTree(this, "execute") {
-      val storePath = CarbonEnv.getInstance(sparkSession).storePath
       val absoluteTableIdentifiers = relations.map { relation =>
         val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
         (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
@@ -81,7 +80,7 @@ case class CarbonDictionaryDecoder(
         child.execute().mapPartitions { iter =>
           val cacheProvider: CacheProvider = CacheProvider.getInstance
           val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
-            cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
+            cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
           val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
             forwardDictionaryCache)
           val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
@@ -124,7 +123,6 @@ case class CarbonDictionaryDecoder(
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
 
-    val storePath = CarbonEnv.getInstance(sparkSession).storePath
     val absoluteTableIdentifiers = relations.map { relation =>
       val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
       (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
@@ -133,9 +131,9 @@ case class CarbonDictionaryDecoder(
     if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
       val cacheProvider: CacheProvider = CacheProvider.getInstance
       val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
-        cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
+        cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
       val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryWrapper(absoluteTableIdentifiers,
-        forwardDictionaryCache, storePath)
+        forwardDictionaryCache)
 
       val exprs = child.output.map { exp =>
         ExpressionCanonicalizer.execute(BindReferences.bindReference(exp, child.output))
@@ -252,7 +250,7 @@ case class CarbonDictionaryDecoder(
       if (f._2 != null) {
         try {
           cache.get(new DictionaryColumnUniqueIdentifier(
-            atiMap(f._1).getCarbonTableIdentifier,
+            atiMap(f._1),
             f._2, f._3.getDataType,
             CarbonStorePath.getCarbonTablePath(atiMap(f._1))))
         } catch {
@@ -266,33 +264,31 @@ case class CarbonDictionaryDecoder(
   }
 
   private def getDictionaryWrapper(atiMap: Map[String, AbsoluteTableIdentifier],
-      cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary], storePath: String) = {
+      cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
     val allDictIdentifiers = new ArrayBuffer[DictionaryColumnUniqueIdentifier]()
     val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryColumnIds.map {
       case (tableName, columnIdentifier, carbonDimension) =>
         if (columnIdentifier != null) {
           try {
-            val (newCarbonTableIdentifier, newColumnIdentifier) =
+            val (newAbsoluteTableIdentifier, newColumnIdentifier) =
               if (null != carbonDimension.getColumnSchema.getParentColumnTableRelations &&
                   !carbonDimension
                     .getColumnSchema.getParentColumnTableRelations.isEmpty) {
-                (QueryUtil.getTableIdentifierForColumn(carbonDimension),
+                (QueryUtil.getTableIdentifierForColumn(carbonDimension, atiMap(tableName)),
                   new ColumnIdentifier(carbonDimension.getColumnSchema
                     .getParentColumnTableRelations.get(0).getColumnId,
                     carbonDimension.getColumnProperties,
                     carbonDimension.getDataType))
               } else {
-                (atiMap(tableName).getCarbonTableIdentifier, columnIdentifier)
+                (atiMap(tableName), columnIdentifier)
               }
             val dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(
-              newCarbonTableIdentifier,
+              newAbsoluteTableIdentifier,
               newColumnIdentifier, carbonDimension.getDataType,
               CarbonStorePath
-                .getCarbonTablePath(atiMap(tableName).getStorePath, newCarbonTableIdentifier))
+                .getCarbonTablePath(newAbsoluteTableIdentifier))
             allDictIdentifiers += dictionaryColumnUniqueIdentifier
-            new ForwardDictionaryWrapper(
-              storePath,
-              dictionaryColumnUniqueIdentifier)
+            new ForwardDictionaryWrapper(dictionaryColumnUniqueIdentifier)
           } catch {
             case _: Throwable => null
           }
@@ -300,7 +296,7 @@ case class CarbonDictionaryDecoder(
           null
         }
     }
-    val dictionaryLoader = new DictionaryLoader(storePath, allDictIdentifiers.toList)
+    val dictionaryLoader = new DictionaryLoader(allDictIdentifiers.toList)
     dicts.foreach { dict =>
       if (dict != null) {
         dict.setDictionaryLoader(dictionaryLoader)
@@ -467,7 +463,6 @@ class CarbonDecoderRDD(
     aliasMap: CarbonAliasDecoderRelation,
     prev: RDD[InternalRow],
     output: Seq[Attribute],
-    storePath: String,
     serializedTableInfo: Array[Byte])
   extends CarbonRDDWithTableInfo[InternalRow](prev, serializedTableInfo) {
 
@@ -516,7 +511,7 @@ class CarbonDecoderRDD(
 
     val cacheProvider: CacheProvider = CacheProvider.getInstance
     val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
-      cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
+      cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
     val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
       forwardDictionaryCache)
     val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
@@ -559,7 +554,7 @@ class CarbonDecoderRDD(
       if (f._2 != null) {
         try {
           cache.get(new DictionaryColumnUniqueIdentifier(
-            atiMap(f._1).getCarbonTableIdentifier,
+            atiMap(f._1),
             f._2, f._3.getDataType,
             CarbonStorePath.getCarbonTablePath(atiMap(f._1))))
         } catch {
@@ -578,10 +573,9 @@ class CarbonDecoderRDD(
 /**
  * It is a wrapper around Dictionary, it is a work around to keep the dictionary serializable in
  * case of codegen
- * @param storePath
+ * @param dictIdentifier Dictionary column unique identifier
  */
 class ForwardDictionaryWrapper(
-    val storePath: String,
     dictIdentifier: DictionaryColumnUniqueIdentifier) extends Serializable {
 
   var dictionary: Dictionary = null
@@ -610,8 +604,8 @@ class ForwardDictionaryWrapper(
 /**
  * It is Dictionary Loader class to load all dictionaries at a time instead of one by one.
  */
-class DictionaryLoader(storePath: String,
-    allDictIdentifiers: List[DictionaryColumnUniqueIdentifier]) extends Serializable {
+class DictionaryLoader(allDictIdentifiers: List[DictionaryColumnUniqueIdentifier])
+  extends Serializable {
 
   var isDictionaryLoaded = false
 
@@ -621,7 +615,7 @@ class DictionaryLoader(storePath: String,
     if (!isDictionaryLoaded) {
       val cacheProvider: CacheProvider = CacheProvider.getInstance
       val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
-        cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
+        cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
       allDicts = forwardDictionaryCache.getAll(allDictIdentifiers.asJava)
       isDictionaryLoaded = true
       val dictionaryTaskCleaner = TaskContext.get

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index de01c8d..fba590e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -222,10 +222,21 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
 
     // check "tablePath" option
     val tablePathOption = parameters.get("tablePath")
+    val dbName: String = parameters.getOrElse("dbName",
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
+    val tableOption: Option[String] = parameters.get("tableName")
+    if (tableOption.isEmpty) {
+      throw new CarbonStreamException("Table creation failed. Table name is not specified")
+    }
+    val tableName = tableOption.get.toLowerCase()
+    if (tableName.contains(" ")) {
+      throw new CarbonStreamException("Table creation failed. Table name cannot contain blank " +
+                                      "space")
+    }
     if (tablePathOption.isDefined) {
       val sparkSession = sqlContext.sparkSession
       val identifier: AbsoluteTableIdentifier =
-        AbsoluteTableIdentifier.fromTablePath(tablePathOption.get)
+        AbsoluteTableIdentifier.from(tablePathOption.get, dbName, tableName)
       val carbonTable =
         CarbonEnv.getInstance(sparkSession).carbonMetastore.
           createCarbonRelation(parameters, identifier, sparkSession).tableMeta.carbonTable
@@ -303,18 +314,20 @@ object CarbonSource {
     val tableName: String = properties.getOrElse("tableName", "").toLowerCase
     val model = createTableInfoFromParams(properties, dataSchema, dbName, tableName)
     val tableInfo: TableInfo = TableNewProcessor(model)
-    val tablePath = CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName
+    val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+      CarbonEnv.getInstance(sparkSession).storePath)
+    val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
     val schemaEvolutionEntry = new SchemaEvolutionEntry
     schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
     tableInfo.getFactTable.getSchemaEvalution.
       getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
     val map = if (metaStore.isReadFromHiveMetaStore) {
-      val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+      val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
       val schemaMetadataPath =
         CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
       tableInfo.setMetaDataFilepath(schemaMetadataPath)
-      tableInfo.setStorePath(tableIdentifier.getStorePath)
+      tableInfo.setTablePath(tableIdentifier.getTablePath)
       CarbonUtil.convertToMultiStringMap(tableInfo)
     } else {
       metaStore.saveToDisk(tableInfo, tablePath)
@@ -322,6 +335,7 @@ object CarbonSource {
     }
     properties.foreach(e => map.put(e._1, e._2))
     map.put("tablePath", tablePath)
+    map.put("dbname", dbName)
     map.asScala.toMap
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
index f5c6cba..197b23b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
@@ -39,9 +39,11 @@ case class CarbonCreateTableCommand(
   override def processSchema(sparkSession: SparkSession): Seq[Row] = {
     val storePath = CarbonEnv.getInstance(sparkSession).storePath
     CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables(storePath)
+      checkSchemasModifiedTimeAndReloadTables()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
+    val dbLocation = GetDB.getDatabaseLocation(cm.databaseName, sparkSession, storePath)
+    val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + cm.tableName
     val tbName = cm.tableName
     val dbName = cm.databaseName
     LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
@@ -70,11 +72,10 @@ case class CarbonCreateTableCommand(
         sys.error(s"Table [$tbName] already exists under database [$dbName]")
       }
     } else {
-      val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
+      val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tbName)
       // Add Database to catalog and persist
       val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      val tablePath = tableIdentifier.getTablePath
-      val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
+      val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
       if (createDSTable) {
         try {
           val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
@@ -89,10 +90,9 @@ case class CarbonCreateTableCommand(
             s""""$tablePath"$carbonSchemaString) """)
         } catch {
           case e: Exception =>
-            val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
             // call the drop table to delete the created table.
             CarbonEnv.getInstance(sparkSession).carbonMetastore
-              .dropTable(tablePath, identifier)(sparkSession)
+              .dropTable(tableIdentifier)(sparkSession)
 
             LOGGER.audit(s"Table creation with Database name [$dbName] " +
                          s"and Table name [$tbName] failed")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
index 1bf17b3..0343393 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
@@ -53,14 +54,16 @@ case class CarbonDropTableCommand(
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
     val carbonEnv = CarbonEnv.getInstance(sparkSession)
     val catalog = carbonEnv.carbonMetastore
-    val tableIdentifier =
-      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath,
-        dbName.toLowerCase, tableName.toLowerCase)
-    catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath)
+    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+      CarbonEnv.getInstance(sparkSession).storePath)
+    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
+    val absoluteTableIdentifier = AbsoluteTableIdentifier
+      .from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
+    catalog.checkSchemasModifiedTimeAndReloadTables()
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
       locksToBeAcquired foreach {
-        lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
+        lock => carbonLocks += CarbonLockUtil.getLockObject(absoluteTableIdentifier, lock)
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
       val carbonTable: Option[CarbonTable] =
@@ -98,7 +101,7 @@ case class CarbonDropTableCommand(
           sparkSession)
       OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext)
       CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession)
+        .dropTable(absoluteTableIdentifier)(sparkSession)
 
       // fires the event after dropping main table
       val dropTablePostEvent: DropTablePostEvent =
@@ -127,8 +130,10 @@ case class CarbonDropTableCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // delete the table folder
     val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
-    val tableIdentifier =
-      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName)
+    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+      CarbonEnv.getInstance(sparkSession).storePath)
+    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
+    val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
     val metadataFilePath =
       CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath
     val fileType = FileFactory.getFileType(metadataFilePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index dc3b1ae..66f2756 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
@@ -63,14 +64,16 @@ case class CarbonDropDataMapCommand(
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
     val carbonEnv = CarbonEnv.getInstance(sparkSession)
     val catalog = carbonEnv.carbonMetastore
+    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+      CarbonEnv.getInstance(sparkSession).storePath)
+    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
     val tableIdentifier =
-      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath,
-        dbName.toLowerCase, tableName.toLowerCase)
-    catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath)
+      AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
+    catalog.checkSchemasModifiedTimeAndReloadTables()
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
       locksToBeAcquired foreach {
-        lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
+        lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
       }
       LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]")
       val carbonTable: Option[CarbonTable] =
@@ -140,8 +143,10 @@ case class CarbonDropDataMapCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // delete the table folder
     val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
-    val tableIdentifier =
-      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName)
+    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+      CarbonEnv.getInstance(sparkSession).storePath)
+    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
+    val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
     DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName)
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index f87e734..947cea1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -71,7 +71,7 @@ case class AlterTableCompactionCommand(
     carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
     carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
     carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-    carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
+    carbonLoadModel.setTablePath(relation.tableMeta.carbonTable.getTablePath)
 
     var storeLocation = CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
@@ -131,7 +131,7 @@ case class AlterTableCompactionCommand(
       // Just launch job to merge index and return
       CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
         carbonLoadModel.getLoadMetadataDetails.asScala.map(_.getLoadName),
-        carbonLoadModel.getStorePath,
+        carbonLoadModel.getTablePath,
         carbonTable)
       return
     }
@@ -172,7 +172,7 @@ case class AlterTableCompactionCommand(
     } else {
       // normal flow of compaction
       val lock = CarbonLockFactory
-        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
           LockUsage.COMPACTION_LOCK
         )
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
index 1b16b88..8b0dab7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
@@ -36,10 +36,14 @@ case class CleanFilesCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     if (forceTableClean) {
+      val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
+      val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+        CarbonEnv.getInstance(sparkSession).storePath)
+      // TODO: TAABLEPATH
       CarbonStore.cleanFiles(
-        GetDB.getDatabaseName(databaseNameOp, sparkSession),
+        dbName,
         tableName,
-        CarbonEnv.getInstance(sparkSession).storePath,
+        databaseLocation,
         null,
         forceTableClean)
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 3853b5f..777c169 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -192,7 +192,7 @@ case class LoadTableCommand(
       } finally {
         // Once the data load is successful delete the unwanted partition files
         try {
-          val partitionLocation = table.getStorePath + "/partition/" +
+          val partitionLocation = relation.tableMeta.storePath + "/partition/" +
                                   table.getDatabaseName + "/" +
                                   table.getFactTableName + "/"
           val fileType = FileFactory.getFileType(partitionLocation)
@@ -231,7 +231,7 @@ case class LoadTableCommand(
     val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       .getCarbonTableIdentifier
     val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(carbonLoadModel.getStorePath, carbonTableIdentifier)
+      .getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier)
     val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
     val dimensions = carbonTable.getDimensionByTableName(
       carbonTable.getFactTableName).asScala.toArray
@@ -245,7 +245,7 @@ case class LoadTableCommand(
         dimensions,
         carbonLoadModel,
         sparkSession.sqlContext,
-        carbonLoadModel.getStorePath,
+        carbonLoadModel.getTablePath,
         dictFolderPath)
     }
     if (!StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
@@ -253,7 +253,7 @@ case class LoadTableCommand(
       GlobalDictionaryUtil
         .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
           carbonLoadModel,
-          carbonLoadModel.getStorePath,
+          carbonLoadModel.getTablePath,
           carbonTableIdentifier,
           dictFolderPath,
           dimensions,
@@ -289,7 +289,7 @@ case class LoadTableCommand(
     }
     CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
       carbonLoadModel,
-      carbonLoadModel.getStorePath,
+      carbonLoadModel.getTablePath,
       columnar,
       partitionStatus,
       server,
@@ -332,11 +332,11 @@ case class LoadTableCommand(
     GlobalDictionaryUtil.generateGlobalDictionary(
       sparkSession.sqlContext,
       carbonLoadModel,
-      carbonLoadModel.getStorePath,
+      carbonLoadModel.getTablePath,
       dictionaryDataFrame)
     CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
       carbonLoadModel,
-      carbonLoadModel.getStorePath,
+      carbonLoadModel.getTablePath,
       columnar,
       partitionStatus,
       None,
@@ -351,8 +351,7 @@ case class LoadTableCommand(
       model: DictionaryLoadModel,
       noDictDimension: Array[CarbonDimension]): Unit = {
     val sparkSession = sqlContext.sparkSession
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
-      model.table)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.table)
 
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     // read TableInfo
@@ -374,11 +373,12 @@ case class LoadTableCommand(
       tableInfo, entry, carbonTablePath.getPath)(sparkSession)
 
     // update the schema modified time
-    metastore.updateAndTouchSchemasUpdatedTime(model.hdfsLocation)
+    metastore.updateAndTouchSchemasUpdatedTime()
 
+    val identifier = model.table.getCarbonTableIdentifier
     // update CarbonDataLoadSchema
-    val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName),
-      model.table.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
+    val carbonTable = metastore.lookupRelation(Option(identifier.getDatabaseName),
+      identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
       .carbonTable
     carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 0c39dd4..a52008a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -76,14 +76,10 @@ object DeleteExecution {
       .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
       asInstanceOf[CarbonRelation]
 
-    val storeLocation = relation.tableMeta.storePath
-    val absoluteTableIdentifier: AbsoluteTableIdentifier = new
-        AbsoluteTableIdentifier(storeLocation,
-          relation.tableMeta.carbonTableIdentifier)
-    val tablePath = CarbonStorePath.getCarbonTablePath(
-      storeLocation,
-      absoluteTableIdentifier.getCarbonTableIdentifier)
-    val factPath = tablePath.getFactDir
+    val absoluteTableIdentifier = relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
+    val carbonTablePath = CarbonStorePath
+      .getCarbonTablePath(absoluteTableIdentifier)
+    val factPath = carbonTablePath.getFactDir
 
     val carbonTable = relation.tableMeta.carbonTable
     var deleteStatus = true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
index 764deb7..a898822 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
@@ -60,7 +60,7 @@ private[sql] case class ProjectForDeleteCommand(
     OperationListenerBus.getInstance.fireEvent(deleteFromTablePreEvent, operationContext)
 
     val metadataLock = CarbonLockFactory
-      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
         LockUsage.METADATA_LOCK)
     var lockStatus = false
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
index e48693b..549c58f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -67,7 +67,7 @@ private[sql] case class ProjectForUpdateCommand(
     OperationListenerBus.getInstance.fireEvent(updateTablePreEvent, operationContext)
 
     val metadataLock = CarbonLockFactory
-      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
         LockUsage.METADATA_LOCK)
     var lockStatus = false
     // get the current time stamp which should be same for delete and update.
@@ -83,9 +83,7 @@ private[sql] case class ProjectForUpdateCommand(
       else {
         throw new Exception("Table is locked for updation. Please try after some time")
       }
-      val tablePath = CarbonStorePath.getCarbonTablePath(
-        carbonTable.getStorePath,
-        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
+      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       // Get RDD.
 
       dataSet = if (isPersistEnabled) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
index 9b16060..acd9bd3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
@@ -66,8 +66,8 @@ case class AlterTableDropCarbonPartitionCommand(
     val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
       .asInstanceOf[CarbonRelation]
     val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
-    val storePath = relation.tableMeta.storePath
-    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath)
+    val tablePath = relation.tableMeta.tablePath
+    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
     if (relation == null) {
       sys.error(s"Table $dbName.$tableName does not exist")
     }
@@ -101,14 +101,14 @@ case class AlterTableDropCarbonPartitionCommand(
         sys.error(s"Dropping range interval partition isn't support yet!")
     }
     partitionInfo.dropPartition(partitionIndex)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     // read TableInfo
     val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
 
     val schemaConverter = new ThriftWrapperSchemaConverterImpl()
     val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
-      dbName, tableName, storePath)
+      dbName, tableName, tablePath)
     val tableSchema = wrapperTableInfo.getFactTable
     tableSchema.setPartitionInfo(partitionInfo)
     wrapperTableInfo.setFactTable(tableSchema)
@@ -118,10 +118,10 @@ case class AlterTableDropCarbonPartitionCommand(
     thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
       .setTime_stamp(System.currentTimeMillis)
     carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
-      dbName, tableName, storePath)
+      dbName, tableName, tablePath)
     CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
     // update the schema modified time
-    carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath)
+    carbonMetaStore.updateAndTouchSchemasUpdatedTime()
     // sparkSession.catalog.refreshTable(tableName)
     Seq.empty
   }
@@ -152,7 +152,7 @@ case class AlterTableDropCarbonPartitionCommand(
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
       carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
       carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
-      carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+      carbonLoadModel.setTablePath(relation.tableMeta.tablePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime
       carbonLoadModel.setFactTimeStamp(loadStartTime)
       alterTableDropPartition(
@@ -224,7 +224,7 @@ case class AlterTableDropCarbonPartitionCommand(
       for (thread <- threadArray) {
         thread.join()
       }
-      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath,
+      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
         carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
       val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
       refresher.refreshSegments(validSegments.asJava)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
index c3a918c..0973226 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
@@ -70,11 +70,11 @@ case class AlterTableSplitCarbonPartitionCommand(
     val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
       .asInstanceOf[CarbonRelation]
     val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
-    val storePath = relation.tableMeta.storePath
+    val tablePath = relation.tableMeta.tablePath
     if (relation == null) {
       sys.error(s"Table $dbName.$tableName does not exist")
     }
-    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath)
+    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
     if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
       LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
       sys.error(s"Alter table failed. table not found: $dbName.$tableName")
@@ -95,13 +95,13 @@ case class AlterTableSplitCarbonPartitionCommand(
 
     updatePartitionInfo(partitionInfo, partitionIds)
 
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     // read TableInfo
     val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl()
     val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
-      dbName, tableName, storePath)
+      dbName, tableName, tablePath)
     val tableSchema = wrapperTableInfo.getFactTable
     tableSchema.setPartitionInfo(partitionInfo)
     wrapperTableInfo.setFactTable(tableSchema)
@@ -109,10 +109,10 @@ case class AlterTableSplitCarbonPartitionCommand(
     val thriftTable =
       schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
     carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
-      dbName, tableName, storePath)
+      dbName, tableName, tablePath)
     CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
     // update the schema modified time
-    carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath)
+    carbonMetaStore.updateAndTouchSchemasUpdatedTime()
     sparkSession.sessionState.catalog.refreshTable(TableIdentifier(tableName, Option(dbName)))
     Seq.empty
   }
@@ -153,14 +153,14 @@ case class AlterTableSplitCarbonPartitionCommand(
       val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
       val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
         .asInstanceOf[CarbonRelation]
-      val storePath = relation.tableMeta.storePath
+      val tablePath = relation.tableMeta.tablePath
       val table = relation.tableMeta.carbonTable
       val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
       val dataLoadSchema = new CarbonDataLoadSchema(table)
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
       carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
       carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
-      carbonLoadModel.setStorePath(storePath)
+      carbonLoadModel.setTablePath(tablePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime
       carbonLoadModel.setFactTimeStamp(loadStartTime)
       alterTableSplitPartition(
@@ -232,7 +232,7 @@ case class AlterTableSplitCarbonPartitionCommand(
       threadArray.foreach {
         thread => thread.join()
       }
-      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath,
+      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
         carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
       val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
       refresher.refreshSegments(validSegments.asJava)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index d693061..3193310 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -332,15 +332,15 @@ object PreAggregateUtil {
       locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
       // get the latest carbon table and check for column existence
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-        carbonTable.getCarbonTableIdentifier)
+      val carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter
         .fromExternalToWrapperTableInfo(thriftTableInfo,
           dbName,
           tableName,
-          carbonTable.getStorePath)
+          carbonTable.getTablePath)
       numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
       if (wrapperTableInfo.getDataMapSchemaList.asScala.
         exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) {
@@ -399,7 +399,7 @@ object PreAggregateUtil {
     val acquiredLocks = ListBuffer[ICarbonLock]()
     try {
       locksToBeAcquired.foreach { lock =>
-        acquiredLocks += CarbonLockUtil.getLockObject(table.getCarbonTableIdentifier, lock)
+        acquiredLocks += CarbonLockUtil.getLockObject(table.getAbsoluteTableIdentifier, lock)
       }
       acquiredLocks.toList
     } catch {
@@ -439,13 +439,12 @@ object PreAggregateUtil {
       .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
       .carbonTable
     carbonTable.getTableLastUpdatedTime
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-      carbonTable.getCarbonTableIdentifier)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
     if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
       metastore
-        .revertTableSchemaForPreAggCreationFailure(carbonTable.getCarbonTableIdentifier,
-          thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+        .revertTableSchemaForPreAggCreationFailure(carbonTable.getAbsoluteTableIdentifier,
+          thriftTable)(sparkSession)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 7cc43d2..2132131 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -65,26 +65,25 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener)
       // get the latest carbon table and check for column existence
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-        carbonTable.getCarbonTableIdentifier)
+      val carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter
         .fromExternalToWrapperTableInfo(thriftTableInfo,
           dbName,
           tableName,
-          carbonTable.getStorePath)
+          carbonTable.getTablePath)
       newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
         dbName,
         wrapperTableInfo,
         carbonTablePath,
         carbonTable.getCarbonTableIdentifier,
-        carbonTable.getStorePath, sparkSession.sparkContext).process
+        sparkSession.sparkContext).process
       // generate dictionary files for the newly added columns
       new AlterTableAddColumnRDD(sparkSession.sparkContext,
         newCols,
-        carbonTable.getCarbonTableIdentifier,
-        carbonTable.getStorePath).collect()
+        carbonTable.getAbsoluteTableIdentifier).collect()
       timeStamp = System.currentTimeMillis
       val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
       schemaEvolutionEntry.setTimeStamp(timeStamp)
@@ -105,8 +104,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
           LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
           new AlterTableDropColumnRDD(sparkSession.sparkContext,
             newCols,
-            carbonTable.getCarbonTableIdentifier,
-            carbonTable.getStorePath).collect()
+            carbonTable.getAbsoluteTableIdentifier).collect()
           AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession)
         }
         sys.error(s"Alter table add operation failed: ${e.getMessage}")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index 023e061..e44899e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -74,8 +74,8 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
         sys.error(s"Invalid Column: $columnName")
       }
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-        carbonTable.getCarbonTableIdentifier)
+      val carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       // maintain the added column for schema evolution history
       var addColumnSchema: ColumnSchema = null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 0b737bf..dae2d7b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -110,8 +110,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPreEvent, operationContext)
 
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-        carbonTable.getCarbonTableIdentifier)
+      val carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       val tableInfo: org.apache.carbondata.format.TableInfo =
         metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       // maintain the deleted columns for schema evolution history
@@ -138,8 +138,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       // delete dictionary files for dictionary column and clear dictionary cache from memory
       new AlterTableDropColumnRDD(sparkSession.sparkContext,
         dictionaryColumns,
-        carbonTable.getCarbonTableIdentifier,
-        carbonTable.getStorePath).collect()
+        carbonTable.getAbsoluteTableIdentifier).collect()
 
       // event will be fired before dropping the columns
       val alterTableDropColumnPostEvent: AlterTableDropColumnPostEvent =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index c2e5cf0..f1cce13 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -28,7 +28,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -85,7 +85,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
         .asInstanceOf[CarbonRelation].tableMeta
       carbonTable = tableMeta.carbonTable
       // invalid data map for the old table, see CARBON-1690
-      val oldTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath)
+      val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
       // get the latest carbon table and check for column existence
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(oldTableIdentifier)
@@ -106,6 +106,21 @@ private[sql] case class CarbonAlterTableRenameCommand(
       schemaEvolutionEntry.setTime_stamp(timeStamp)
       renameBadRecords(oldTableName, newTableName, oldDatabaseName)
       val fileType = FileFactory.getFileType(tableMetadataFile)
+      val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
+        newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
+      var newTablePath = CarbonUtil.getNewTablePath(carbonTablePath, newTableIdentifier)
+
+      metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
+      sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+        .runSqlHive(
+          s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
+      sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+        .runSqlHive(
+          s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
+          s"('tableName'='$newTableName', " +
+          s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+      // changed the rename order to deal with situation when carbon table and hive table
+      // will point to the same tablePath
       if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
         val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
           .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
@@ -115,23 +130,12 @@ private[sql] case class CarbonAlterTableRenameCommand(
           sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
         }
       }
-      val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
-        newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
-      val newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier,
+      newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
         tableInfo,
         schemaEvolutionEntry,
         tableMeta.tablePath)(sparkSession)
 
-      metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
-      sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
-        .runSqlHive(
-          s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
-      sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
-        .runSqlHive(
-          s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
-          s"('tableName'='$newTableName', " +
-          s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
       val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
         carbonTable,
         alterTableRenameModel,
@@ -150,7 +154,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
           AlterTableUtil
             .revertRenameTableChanges(oldTableIdentifier,
               newTableName,
-              carbonTable.getStorePath,
+              carbonTable.getTablePath,
               carbonTable.getCarbonTableIdentifier.getTableId,
               timeStamp)(
               sparkSession)
@@ -167,7 +171,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
             locksToBeAcquired,
             oldDatabaseName,
             newTableName,
-            carbonTable.getStorePath)
+            carbonTable.getTablePath)
       }
     }
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index c6ca950..c126b25 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -102,7 +102,6 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       CarbonAliasDecoderRelation(),
       rdd,
       output,
-      CarbonEnv.getInstance(SparkSession.getActiveSession.get).storePath,
       table.carbonTable.getTableInfo.serialize())
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index e39ba73..ef2e0a5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsComm
 import org.apache.spark.sql.execution.command.schema._
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
 
-import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
@@ -77,7 +76,6 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
       _, child: LogicalPlan, overwrite, _) =>
         ExecutedCommandExec(LoadTableByInsertCommand(relation, child, overwrite.enabled)) :: Nil
       case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
-        CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).storePath)
         ExecutedCommandExec(createDb) :: Nil
       case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
         ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 0343402..6d80a26 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -23,11 +23,10 @@ import java.util.concurrent.atomic.AtomicLong
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -39,12 +38,11 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{schema, AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.format
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.util.CarbonSparkUtil
@@ -192,11 +190,11 @@ class CarbonFileMetastore extends CarbonMetaStore {
   private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
     val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = identifier.getCarbonTableIdentifier.getTableName
-    val storePath = identifier.getStorePath
+    val tablePath = identifier.getTablePath
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
       tableName.toLowerCase(), UUID.randomUUID().toString)
     val carbonTablePath =
-      CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+      CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
     val tableMetadataFile = carbonTablePath.getSchemaFilePath
     val fileType = FileFactory.getFileType(tableMetadataFile)
     if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
@@ -204,16 +202,16 @@ class CarbonFileMetastore extends CarbonMetaStore {
       val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl
       val wrapperTableInfo = schemaConverter
-        .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+        .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
       val schemaFilePath = CarbonStorePath
-        .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
-      wrapperTableInfo.setStorePath(storePath)
+        .getCarbonTablePath(tablePath, carbonTableIdentifier).getSchemaFilePath
+      wrapperTableInfo.setTablePath(tablePath)
       wrapperTableInfo
         .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
       val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
       val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
-        identifier.getStorePath,
+        identifier.getTablePath,
         identifier.getTablePath,
         carbonTable)
       metadata.tablesMeta += tableMeta
@@ -237,16 +235,19 @@ class CarbonFileMetastore extends CarbonMetaStore {
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
       tablePath: String) (sparkSession: SparkSession): String = {
-    val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val absoluteTableIdentifier = new AbsoluteTableIdentifier(tablePath, oldTableIdentifier)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
     }
+    val oldCarbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
+    val newAbsoluteTableIdentifier = new AbsoluteTableIdentifier(CarbonUtil
+      .getNewTablePath(oldCarbonTablePath, newTableIdentifier), newTableIdentifier)
     val wrapperTableInfo = schemaConverter
       .fromExternalToWrapperTableInfo(thriftTableInfo,
         newTableIdentifier.getDatabaseName,
         newTableIdentifier.getTableName,
-        absoluteTableIdentifier.getStorePath)
+        newAbsoluteTableIdentifier.getTablePath)
     val identifier =
       new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
         newTableIdentifier.getTableName,
@@ -254,10 +255,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val path = createSchemaThriftFile(wrapperTableInfo,
       thriftTableInfo,
       identifier)
-    addTableCache(wrapperTableInfo,
-      AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath,
-        newTableIdentifier.getDatabaseName,
-        newTableIdentifier.getTableName))
+    addTableCache(wrapperTableInfo, newAbsoluteTableIdentifier)
     path
   }
 
@@ -266,47 +264,44 @@ class CarbonFileMetastore extends CarbonMetaStore {
    *
    * @param carbonTableIdentifier
    * @param thriftTableInfo
-   * @param tablePath
    * @param sparkSession
    */
   def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      tablePath: String)(sparkSession: SparkSession): String = {
-    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+      absoluteTableIdentifier: AbsoluteTableIdentifier)(sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val wrapperTableInfo = schemaConverter
       .fromExternalToWrapperTableInfo(thriftTableInfo,
         carbonTableIdentifier.getDatabaseName,
         carbonTableIdentifier.getTableName,
-        tableIdentifier.getStorePath)
+        absoluteTableIdentifier.getTablePath)
     val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
-    wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+    wrapperTableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
     val path = createSchemaThriftFile(wrapperTableInfo,
       thriftTableInfo,
-      tableIdentifier.getCarbonTableIdentifier)
-    addTableCache(wrapperTableInfo, tableIdentifier)
+      absoluteTableIdentifier.getCarbonTableIdentifier)
+    addTableCache(wrapperTableInfo, absoluteTableIdentifier)
     path
   }
 
-  override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier:
-  CarbonTableIdentifier,
-      thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      tablePath: String)(sparkSession: SparkSession): String = {
-    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+  override def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier:
+  AbsoluteTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo)
+    (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val wrapperTableInfo = schemaConverter
       .fromExternalToWrapperTableInfo(thriftTableInfo,
-        carbonTableIdentifier.getDatabaseName,
-        carbonTableIdentifier.getTableName,
-        tableIdentifier.getStorePath)
+        absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+        absoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+        absoluteTableIdentifier.getTablePath)
     val childSchemaList = wrapperTableInfo.getDataMapSchemaList
     childSchemaList.remove(childSchemaList.size() - 1)
-    wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+    wrapperTableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
     val path = createSchemaThriftFile(wrapperTableInfo,
       thriftTableInfo,
-      tableIdentifier.getCarbonTableIdentifier)
-    addTableCache(wrapperTableInfo, tableIdentifier)
+      absoluteTableIdentifier.getCarbonTableIdentifier)
+    addTableCache(wrapperTableInfo, absoluteTableIdentifier)
     path
 
   }
@@ -323,8 +318,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val tableName = tableInfo.getFactTable.getTableName
     val thriftTableInfo = schemaConverter
       .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
-    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
-    tableInfo.setStorePath(identifier.getStorePath)
+    val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
+    tableInfo.setTablePath(identifier.getTablePath)
     createSchemaThriftFile(tableInfo,
       thriftTableInfo,
       identifier.getCarbonTableIdentifier)
@@ -335,19 +330,18 @@ class CarbonFileMetastore extends CarbonMetaStore {
    * Generates schema string from TableInfo
    */
   override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
-      tablePath: String): String = {
-    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+      absoluteTableIdentifier: AbsoluteTableIdentifier): String = {
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
     val schemaMetadataPath =
       CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
     tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setStorePath(tableIdentifier.getStorePath)
+    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
     val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
     schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
     tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
     removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    addTableCache(tableInfo, tableIdentifier)
+    addTableCache(tableInfo, absoluteTableIdentifier)
     CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
   }
 
@@ -362,7 +356,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
       thriftTableInfo: TableInfo,
       carbonTableIdentifier: CarbonTableIdentifier): String = {
     val carbonTablePath = CarbonStorePath.
-      getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier)
+      getCarbonTablePath(tableInfo.getTablePath, carbonTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
     tableInfo.setMetaDataFilepath(schemaMetadataPath)
@@ -374,7 +368,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     thriftWriter.open(FileWriteOperation.OVERWRITE)
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath))
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime())
     carbonTablePath.getPath
   }
 
@@ -384,7 +378,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
     removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath,
+    val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getTablePath,
       absoluteTableIdentifier.getTablePath,
       CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
     metadata.tablesMeta += tableMeta
@@ -424,16 +418,16 @@ class CarbonFileMetastore extends CarbonMetaStore {
   }
 
   def updateMetadataByThriftTable(schemaFilePath: String,
-      tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = {
+      tableInfo: TableInfo, dbName: String, tableName: String, tablePath: String): Unit = {
 
     tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
       .setTime_stamp(System.currentTimeMillis())
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+      .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
     wrapperTableInfo
       .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-    wrapperTableInfo.setStorePath(storePath)
+    wrapperTableInfo.setTablePath(tablePath)
     updateMetadataByWrapperTable(wrapperTableInfo)
   }
 
@@ -446,17 +440,17 @@ class CarbonFileMetastore extends CarbonMetaStore {
       FileFactory.isFileExist(tablePath, fileType)
     } catch {
       case e: Exception =>
-        false
+       false
     }
   }
 
 
-  def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
+  def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier)
     (sparkSession: SparkSession) {
-    val dbName = tableIdentifier.database.get
-    val tableName = tableIdentifier.table
-    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
-    val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath
+    val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
+    val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
+    val metadataFilePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
+      .getMetadataDirectoryPath
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
@@ -467,21 +461,28 @@ class CarbonFileMetastore extends CarbonMetaStore {
     if (FileFactory.isFileExist(metadataFilePath, fileType)) {
       // while drop we should refresh the schema modified time so that if any thing has changed
       // in the other beeline need to update.
-      checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+      checkSchemasModifiedTimeAndReloadTables()
 
       removeTableFromMetadata(dbName, tableName)
-
-      updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
+      updateSchemasUpdatedTime(touchSchemaFileSystemTime())
       CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
       // discard cached table info in cachedDataSourceTables
+      val tableIdentifier = TableIdentifier(tableName, Option(dbName))
       sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
-      DataMapStoreManager.getInstance().clearDataMaps(identifier)
+      DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
     }
   }
 
-  private def getTimestampFileAndType(basePath: String) = {
+  private def getTimestampFileAndType() = {
+    var basePath = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
+        CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT)
+    basePath = CarbonUtil.checkAndAppendFileSystemURIScheme(basePath)
     val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
     val timestampFileType = FileFactory.getFileType(timestampFile)
+    if (!FileFactory.isFileExist(basePath, timestampFileType)) {
+      FileFactory.mkdirs(basePath, timestampFileType)
+    }
     (timestampFile, timestampFileType)
   }
 
@@ -494,8 +495,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
     tableModifiedTimeStore.put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, timeStamp)
   }
 
-  def updateAndTouchSchemasUpdatedTime(basePath: String) {
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath))
+  def updateAndTouchSchemasUpdatedTime() {
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime())
   }
 
 
@@ -504,10 +505,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
    *
    * @return
    */
-  private def touchSchemaFileSystemTime(basePath: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath)
+  private def touchSchemaFileSystemTime(): Long = {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType()
     if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      LOGGER.audit(s"Creating timestamp file for $basePath")
+      LOGGER.audit(s"Creating timestamp file for $timestampFile")
       FileFactory.createNewFile(timestampFile, timestampFileType)
     }
     FileFactory.getCarbonFile(timestampFile, timestampFileType)
@@ -518,9 +519,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
       .getLastModifiedTime
   }
 
-  def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
-    val (timestampFile, timestampFileType) =
-      getTimestampFileAndType(storePath)
+  def checkSchemasModifiedTimeAndReloadTables() {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType()
     if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
       if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
         getLastModifiedTime ==

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index a500f00..dedaf1c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -18,7 +18,8 @@ package org.apache.spark.sql.hive
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 
@@ -33,7 +34,7 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.format
 import org.apache.carbondata.format.SchemaEvolutionEntry
 import org.apache.carbondata.processing.merger.TableMeta
-import org.apache.carbondata.spark.util.CarbonSparkUtil
+import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil}
 
 /**
  * Metastore to store carbonschema in hive
@@ -56,7 +57,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     if (info != null) {
       val table = CarbonTable.buildFromTableInfo(info)
       val meta = new TableMeta(table.getCarbonTableIdentifier,
-        absIdentifier.getStorePath, absIdentifier.getTablePath, table)
+        absIdentifier.getTablePath, absIdentifier.getTablePath, table)
       CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName,
         CarbonSparkUtil.createSparkMeta(table), meta)
     } else {
@@ -70,25 +71,25 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     tableExists(tableIdentifier)(sparkSession)
   }
 
-  override def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
+  override def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier)
     (sparkSession: SparkSession): Unit = {
-    val dbName = tableIdentifier.database.get
-    val tableName = tableIdentifier.table
-    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
+    val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
     }
-    checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+    checkSchemasModifiedTimeAndReloadTables()
     removeTableFromMetadata(dbName, tableName)
     CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
     // discard cached table info in cachedDataSourceTables
+    val tableIdentifier = TableIdentifier(tableName, Option(dbName))
     sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
-    DataMapStoreManager.getInstance().clearDataMaps(identifier)
+    DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
   }
 
-  override def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
+  override def checkSchemasModifiedTimeAndReloadTables() {
     // do nothing now
   }
 
@@ -125,14 +126,13 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       tablePath: String)
     (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
     }
     updateHiveMetaStoreForAlter(newTableIdentifier,
       oldTableIdentifier,
       thriftTableInfo,
-      identifier.getStorePath,
+      tablePath,
       sparkSession,
       schemaConverter)
   }
@@ -142,19 +142,18 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
    *
    * @param newTableIdentifier
    * @param thriftTableInfo
-   * @param carbonStorePath
+   * @param carbonTablePath
    * @param sparkSession
    */
   override def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      carbonStorePath: String)(sparkSession: SparkSession): String = {
+      carbonTablePath: String)(sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val identifier = AbsoluteTableIdentifier.fromTablePath(carbonStorePath)
     updateHiveMetaStoreForDataMap(newTableIdentifier,
       oldTableIdentifier,
       thriftTableInfo,
-      identifier.getStorePath,
+      carbonTablePath,
       sparkSession,
       schemaConverter)
   }
@@ -165,13 +164,14 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       carbonStorePath: String,
       sparkSession: SparkSession,
       schemaConverter: ThriftWrapperSchemaConverterImpl) = {
+    val tablePath = CarbonUtil.getNewTablePath(new Path(carbonStorePath), newTableIdentifier)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, newTableIdentifier)
     val wrapperTableInfo = schemaConverter
       .fromExternalToWrapperTableInfo(thriftTableInfo,
         newTableIdentifier.getDatabaseName,
         newTableIdentifier.getTableName,
-        carbonStorePath)
-    wrapperTableInfo.setStorePath(carbonStorePath)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier)
+        carbonTablePath.toString)
+    wrapperTableInfo.setTablePath(carbonStorePath)
     val schemaMetadataPath =
       CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
     wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
@@ -189,16 +189,17 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
   private def updateHiveMetaStoreForDataMap(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
-      carbonStorePath: String,
+      tablePath: String,
       sparkSession: SparkSession,
       schemaConverter: ThriftWrapperSchemaConverterImpl) = {
+    val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newTableIdentifier)
     val wrapperTableInfo = schemaConverter
       .fromExternalToWrapperTableInfo(thriftTableInfo,
         newTableIdentifier.getDatabaseName,
         newTableIdentifier.getTableName,
-        carbonStorePath)
-    wrapperTableInfo.setStorePath(carbonStorePath)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier)
+        newTablePath)
+    wrapperTableInfo.setTablePath(newTablePath)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(newTablePath, newTableIdentifier)
     val schemaMetadataPath =
       CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
     wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
@@ -207,7 +208,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
     removeTableFromMetadata(dbName, tableName)
     CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath
+    carbonTablePath.getPath
   }
 
   /**
@@ -219,32 +220,31 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
    */
   override def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
-      tablePath: String)
+      identifier: AbsoluteTableIdentifier)
     (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
     updateHiveMetaStoreForAlter(carbonTableIdentifier,
       carbonTableIdentifier,
       thriftTableInfo,
-      identifier.getStorePath,
+      identifier.getTablePath,
       sparkSession,
       schemaConverter)
   }
 
-  override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier:
-  CarbonTableIdentifier,
-      thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      tablePath: String)(sparkSession: SparkSession): String = {
+  override def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier:
+  AbsoluteTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo)
+    (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val childSchemas = thriftTableInfo.dataMapSchemas
     childSchemas.remove(childSchemas.size())
+    val carbonTableIdentifier = absoluteTableIdentifier.getCarbonTableIdentifier
     updateHiveMetaStoreForAlter(carbonTableIdentifier,
       carbonTableIdentifier,
       thriftTableInfo,
-      identifier.getStorePath,
+      absoluteTableIdentifier.getTablePath,
       sparkSession,
       schemaConverter)
   }