You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/14 17:50:34 UTC

[12/18] carbondata git commit: [CARBONDATA-1573] [Integration] Support Database Location Configuration while Creating Database/ Support Creation of carbon Table in the database location

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 24996ed..357a812 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -90,18 +90,17 @@ trait CarbonMetaStore {
    *
    * @param carbonTableIdentifier
    * @param thriftTableInfo
-   * @param tablePath
+   * @param absoluteTableIdentifier
    * @param sparkSession
    */
   def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      tablePath: String)
+      absoluteTableIdentifier: AbsoluteTableIdentifier)
     (sparkSession: SparkSession): String
 
 
-  def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier: CarbonTableIdentifier,
-      thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      tablePath: String)(sparkSession: SparkSession): String
+  def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier: AbsoluteTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo)(sparkSession: SparkSession): String
   /**
    * Prepare Thrift Schema from wrapper TableInfo and write to disk
    */
@@ -113,7 +112,7 @@ trait CarbonMetaStore {
    * @return
    */
   def generateTableSchemaString(tableInfo: schema.table.TableInfo,
-      tablePath: String): String
+      absoluteTableIdentifier: AbsoluteTableIdentifier): String
 
   /**
    * This method will remove the table meta from catalog metadata array
@@ -128,12 +127,12 @@ trait CarbonMetaStore {
 
   def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean
 
-  def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
+  def dropTable(tableIdentifier: AbsoluteTableIdentifier)
     (sparkSession: SparkSession)
 
-  def updateAndTouchSchemasUpdatedTime(basePath: String)
+  def updateAndTouchSchemasUpdatedTime()
 
-  def checkSchemasModifiedTimeAndReloadTables(storePath: String)
+  def checkSchemasModifiedTimeAndReloadTables()
 
   def isReadFromHiveMetaStore : Boolean
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 825f6ed..e587395 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -108,7 +108,7 @@ class CarbonSessionCatalog(
     var isRefreshed = false
     val storePath = CarbonEnv.getInstance(sparkSession).storePath
     carbonEnv.carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables(storePath)
+      checkSchemasModifiedTimeAndReloadTables()
 
     val tableMeta = carbonEnv.carbonMetastore
       .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 7d25efd..b76b24e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.sql.hive.execution.command
 
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.command._
 
@@ -38,15 +39,23 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
     if (sparkSession.sessionState.catalog.listDatabases().exists(_.equalsIgnoreCase(dbName))) {
       tablesInDB = sparkSession.sessionState.catalog.listTables(dbName)
     }
+    var databaseLocation = ""
+    try {
+      databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+        CarbonEnv.getInstance(sparkSession).storePath)
+    } catch {
+      case e: NoSuchDatabaseException =>
+        // ignore the exception as exception will be handled by hive command.run
+      databaseLocation = CarbonEnv.getInstance(sparkSession).storePath
+    }
     // DropHiveDB command will fail if cascade is false and one or more table exists in database
-    val rows = command.run(sparkSession)
     if (command.cascade && tablesInDB != null) {
       tablesInDB.foreach { tableName =>
         CarbonDropTableCommand(true, tableName.database, tableName.table).run(sparkSession)
       }
     }
-    CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase,
-      CarbonEnv.getInstance(sparkSession).storePath)
+    CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase, databaseLocation)
+    val rows = command.run(sparkSession)
     rows
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index bda4eeb..153b169 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
+import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -30,14 +31,13 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock}
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.format.TableInfo
 
 object AlterTableUtil {
 
@@ -69,7 +69,7 @@ object AlterTableUtil {
     val acquiredLocks = ListBuffer[ICarbonLock]()
     try {
       locksToBeAcquired.foreach { lock =>
-        acquiredLocks += CarbonLockUtil.getLockObject(table.getCarbonTableIdentifier, lock)
+        acquiredLocks += CarbonLockUtil.getLockObject(table.getAbsoluteTableIdentifier, lock)
       }
       acquiredLocks.toList
     } catch {
@@ -102,15 +102,14 @@ object AlterTableUtil {
    * @param locksAcquired
    * @param dbName
    * @param tableName
-   * @param storeLocation
+   * @param tablePath
    */
   def releaseLocksManually(locks: List[ICarbonLock],
       locksAcquired: List[String],
       dbName: String,
       tableName: String,
-      storeLocation: String): Unit = {
-    val lockLocation = storeLocation + CarbonCommonConstants.FILE_SEPARATOR +
-                       dbName + CarbonCommonConstants.FILE_SEPARATOR + tableName
+      tablePath: String): Unit = {
+    val lockLocation = tablePath
     locks.zip(locksAcquired).foreach { case (carbonLock, lockType) =>
       val lockFilePath = lockLocation + CarbonCommonConstants.FILE_SEPARATOR +
                          lockType
@@ -185,20 +184,23 @@ object AlterTableUtil {
    */
   def revertRenameTableChanges(oldTableIdentifier: TableIdentifier,
       newTableName: String,
-      storePath: String,
+      tablePath: String,
       tableId: String,
       timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
     val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+    val oldCarbonTableIdentifier = new CarbonTableIdentifier(database,
+      oldTableIdentifier.table, tableId)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, oldCarbonTableIdentifier)
     val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, newCarbonTableIdentifier)
+    val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newCarbonTableIdentifier)
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val tableMetadataFile = carbonTablePath.getPath
     val fileType = FileFactory.getFileType(tableMetadataFile)
     if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
       val tableInfo = if (metastore.isReadFromHiveMetaStore) {
         // In case of hive metastore we first update the carbonschema inside old table only.
-        metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(storePath,
+        metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(tablePath,
           new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)))(sparkSession)
       } else {
         metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -210,9 +212,10 @@ object AlterTableUtil {
         FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
           .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
                        oldTableIdentifier.table)
-        val tableIdentifier = new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)
-        metastore.revertTableSchemaInAlterFailure(tableIdentifier,
-          tableInfo, carbonTablePath.getPath)(sparkSession)
+        val absoluteTableIdentifier = new AbsoluteTableIdentifier(newTablePath,
+          newCarbonTableIdentifier)
+        metastore.revertTableSchemaInAlterFailure(oldCarbonTableIdentifier,
+          tableInfo, absoluteTableIdentifier)(sparkSession)
         metastore.removeTableFromMetadata(database, newTableName)
       }
     }
@@ -233,7 +236,7 @@ object AlterTableUtil {
       .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
       .carbonTable
 
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
       carbonTable.getCarbonTableIdentifier)
     val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
@@ -244,7 +247,7 @@ object AlterTableUtil {
       thriftTable.fact_table.table_columns.removeAll(addedSchemas)
       metastore
         .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
-          thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+          thriftTable, carbonTable.getAbsoluteTableIdentifier)(sparkSession)
     }
   }
 
@@ -262,7 +265,7 @@ object AlterTableUtil {
     val carbonTable = metastore
       .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
       .carbonTable
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
       carbonTable.getCarbonTableIdentifier)
     val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
@@ -279,7 +282,7 @@ object AlterTableUtil {
       }
       metastore
         .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
-          thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+          thriftTable, carbonTable.getAbsoluteTableIdentifier)(sparkSession)
     }
   }
 
@@ -297,7 +300,7 @@ object AlterTableUtil {
     val carbonTable = metastore
       .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
       .carbonTable
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
       carbonTable.getCarbonTableIdentifier)
     val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
@@ -317,7 +320,7 @@ object AlterTableUtil {
       }
       metastore
         .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
-          thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+          thriftTable, carbonTable.getAbsoluteTableIdentifier)(sparkSession)
     }
   }
 
@@ -351,7 +354,7 @@ object AlterTableUtil {
         .tableMeta.carbonTable
       // get the latest carbon table
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
         carbonTable.getCarbonTableIdentifier)
       val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
@@ -359,7 +362,7 @@ object AlterTableUtil {
         .fromExternalToWrapperTableInfo(thriftTableInfo,
           dbName,
           tableName,
-          carbonTable.getStorePath)
+          carbonTable.getTablePath)
       val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
       schemaEvolutionEntry.setTimeStamp(timeStamp)
       val thriftTable = schemaConverter

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index 421cd2e..dcfbaea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -61,7 +61,7 @@ object CleanFiles {
     }
     val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
     CarbonEnv.getInstance(spark).carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
+      checkSchemasModifiedTimeAndReloadTables()
     cleanFiles(spark, dbName, tableName, storePath, forceTableClean)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 91121ce..709f474 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -58,7 +58,7 @@ object Compaction {
     val compactionType = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
     CarbonEnv.getInstance(spark).carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
+      checkSchemasModifiedTimeAndReloadTables()
     compaction(spark, dbName, tableName, compactionType)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 4aaec8f..8375762 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -48,7 +48,7 @@ object DeleteSegmentByDate {
     val dateValue = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
     CarbonEnv.getInstance(spark).carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
+      checkSchemasModifiedTimeAndReloadTables()
     deleteSegmentByDate(spark, dbName, tableName, dateValue)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index c86c7f5..9b87504 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -53,7 +53,7 @@ object DeleteSegmentById {
     val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
     CarbonEnv.getInstance(spark).carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
+      checkSchemasModifiedTimeAndReloadTables()
     deleteSegmentById(spark, dbName, tableName, segmentIds)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 501402b..13883ac 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -81,7 +81,7 @@ object TableLoader {
     val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
 
     CarbonEnv.getInstance(spark).carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
+      checkSchemasModifiedTimeAndReloadTables()
     loadTable(spark, Option(dbName), tableName, inputPaths, map)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 04de9a3..23cba20 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -843,8 +843,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   }
 
   def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = {
-    val tablePath = new CarbonTablePath(carbonTable.getStorePath, carbonTable.getDatabaseName,
-      carbonTable.getFactTableName)
+    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
+      carbonTable.getTablePath)
     val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
     val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 5c7d451..a3024be 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -64,7 +64,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
     // Create table and metadata folders if not exist
     val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(table.getStorePath, table.getCarbonTableIdentifier)
+      .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier)
     val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
     val fileType = FileFactory.getFileType(metadataDirectoryPath)
     if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index fd3b2cd..930de43 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.test.TestQueryExecutor
 
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
@@ -41,11 +41,12 @@ object DictionaryTestCaseUtil {
     val table = relation.tableMeta.carbonTable
     val dimension = table.getDimensionByName(table.getFactTableName, columnName)
     val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getFactTableName, "uniqueid")
-    val columnIdentifier = new DictionaryColumnUniqueIdentifier(tableIdentifier,
+    val  absoluteTableIdentifier = new AbsoluteTableIdentifier(table.getTablePath, tableIdentifier)
+    val columnIdentifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
       dimension.getColumnIdentifier, dimension.getDataType,
       CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
     )
-    val dict = CarbonLoaderUtil.getDictionary(columnIdentifier, TestQueryExecutor.storeLocation)
+    val dict = CarbonLoaderUtil.getDictionary(columnIdentifier)
     assert(dict.getSurrogateKey(value) != CarbonCommonConstants.INVALID_SURROGATE_KEY)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 399665f..d37a68b 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -177,7 +177,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     carbonLoadModel.setMaxColumns("100")
     // Create table and metadata folders if not exist
     val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(table.getStorePath, table.getCarbonTableIdentifier)
+      .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier)
     val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
     val fileType = FileFactory.getFileType(metadataDirectoryPath)
     if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index deb6287..3fb1424 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -196,7 +196,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       thread1.start()
       // use thread pool to catch the exception of sink thread
       val pool = Executors.newSingleThreadExecutor()
-      val thread2 = createSocketStreamingThread(spark, tablePath)
+      val thread2 = createSocketStreamingThread(spark, tablePath, identifier)
       val future = pool.submit(thread2)
       Thread.sleep(1000)
       thread1.interrupt()
@@ -242,7 +242,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val csvDataDir = new File("target/csvdata").getCanonicalPath
     // streaming ingest 10 rows
     generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
-    val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1)
+    val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1,
+      identifier )
     thread.start()
     Thread.sleep(2000)
     generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir)
@@ -636,6 +637,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
   def createSocketStreamingThread(
       spark: SparkSession,
       tablePath: CarbonTablePath,
+      tableIdentifier: TableIdentifier,
       badRecordAction: String = "force",
       intervalSecond: Int = 2,
       handoffSize: Long = CarbonStreamOutputFormat.HANDOFF_SIZE_DEFAULT): Thread = {
@@ -656,6 +658,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
             .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
             .option("tablePath", tablePath.getPath)
             .option("bad_records_action", badRecordAction)
+            .option("dbName", tableIdentifier.database.get)
+            .option("tableName", tableIdentifier.table)
             .option(CarbonStreamOutputFormat.HANDOFF_SIZE, handoffSize)
             .start()
           qry.awaitTermination()
@@ -698,7 +702,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
         badRecords = generateBadRecords)
       val thread2 = createSocketStreamingThread(
         spark = spark,
-        tablePath = tablePath,
+        tablePath = tablePath, identifier,
         badRecordAction = badRecordAction,
         intervalSecond = intervalOfIngest,
         handoffSize = handoffSize)
@@ -740,7 +744,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       spark: SparkSession,
       tablePath: CarbonTablePath,
       csvDataDir: String,
-      intervalSecond: Int): Thread = {
+      intervalSecond: Int,
+      tableIdentifier: TableIdentifier): Thread = {
     new Thread() {
       override def run(): Unit = {
         val inputSchema = new StructType()
@@ -765,6 +770,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
             .trigger(ProcessingTime(s"${ intervalSecond } seconds"))
             .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
             .option("tablePath", tablePath.getPath)
+            .option("dbName", tableIdentifier.database.get)
+            .option("tableName", tableIdentifier.table)
             .start()
 
           qry.awaitTermination()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index e574d7f..3b6b85d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -437,7 +437,7 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl
   test("test to check if the lock file is successfully deleted") {
       sql("create table lock_check(id int, name string) stored by 'carbondata'")
     sql("alter table lock_check rename to lock_rename")
-    assert(!new File(s"${ CarbonCommonConstants.STORE_LOCATION } + /default/lock_rename/meta.lock")
+    assert(!new File(s"${ CarbonCommonConstants.STORE_LOCATION } + /lock_rename/meta.lock")
       .exists())
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index adf4574..9f89226 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -26,7 +26,8 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 
 class CarbonCommandSuite extends Spark2QueryTest with BeforeAndAfterAll {
 
@@ -115,7 +116,8 @@ class CarbonCommandSuite extends Spark2QueryTest with BeforeAndAfterAll {
     createAndLoadTestTable(table, "csv_table")
     DeleteSegmentById.main(Array(s"${location}", table, "0"))
     CleanFiles.main(Array(s"${location}", table))
-    val tablePath = s"${location}${File.separator}default${File.separator}$table"
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_"+table)
+    val tablePath = carbonTable.getAbsoluteTableIdentifier.getTablePath
     val f = new File(s"$tablePath/Fact/Part0")
     assert(f.isDirectory)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 95d7d2e..f70e38e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -122,16 +122,16 @@ public class PrimitiveDataType implements GenericDataType<Object> {
    */
   public PrimitiveDataType(String name, String parentname, String columnId,
       CarbonDimension carbonDimension, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
-      String storePath, Map<Object, Integer> localCache) {
+      AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
+      Map<Object, Integer> localCache) {
     this.name = name;
     this.parentname = parentname;
     this.columnId = columnId;
     this.carbonDimension = carbonDimension;
     DictionaryColumnUniqueIdentifier identifier =
-        new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
+        new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
             carbonDimension.getColumnIdentifier(), carbonDimension.getDataType(),
-            CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier));
+            CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier));
     try {
       if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
         dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory
@@ -139,13 +139,14 @@ public class PrimitiveDataType implements GenericDataType<Object> {
       } else {
         Dictionary dictionary = null;
         if (useOnePass) {
-          if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
+          if (CarbonUtil.isFileExistsForGivenColumn(identifier)) {
             dictionary = cache.get(identifier);
           }
           DictionaryMessage dictionaryMessage = new DictionaryMessage();
           dictionaryMessage.setColumnName(carbonDimension.getColName());
           // for table initialization
-          dictionaryMessage.setTableUniqueId(carbonTableIdentifier.getTableId());
+          dictionaryMessage
+              .setTableUniqueId(absoluteTableIdentifier.getCarbonTableIdentifier().getTableId());
           dictionaryMessage.setData("0");
           // for generate dictionary
           dictionaryMessage.setType(DictionaryMessageType.DICT_GENERATION);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 05104a2..442d93e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -142,7 +142,7 @@ public final class DataLoadProcessBuilder {
     CarbonProperties.getInstance().addProperty(tempLocationKey,
         StringUtils.join(storeLocation, File.pathSeparator));
     CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getStorePath());
+        .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getTablePath());
 
     return createConfiguration(loadModel);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
index 7045101..4ac8850 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
@@ -33,7 +33,7 @@ import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
 import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
 import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -65,8 +65,8 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
 
   public DictionaryFieldConverterImpl(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index,
-      DictionaryClient client, boolean useOnePass, String storePath,
+      AbsoluteTableIdentifier absoluteTableIdentifier, String nullFormat, int index,
+      DictionaryClient client, boolean useOnePass,
       Map<Object, Integer> localCache, boolean isEmptyBadRecord,
       DictionaryColumnUniqueIdentifier identifier) throws IOException {
     this.index = index;
@@ -76,13 +76,14 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
 
     // if use one pass, use DictionaryServerClientDictionary
     if (useOnePass) {
-      if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
+      if (CarbonUtil.isFileExistsForGivenColumn(identifier)) {
         dictionary = cache.get(identifier);
       }
       dictionaryMessage = new DictionaryMessage();
       dictionaryMessage.setColumnName(dataField.getColumn().getColName());
       // for table initialization
-      dictionaryMessage.setTableUniqueId(carbonTableIdentifier.getTableId());
+      dictionaryMessage
+          .setTableUniqueId(absoluteTableIdentifier.getCarbonTableIdentifier().getTableId());
       dictionaryMessage.setData("0");
       // for generate dictionary
       dictionaryMessage.setType(DictionaryMessageType.DICT_GENERATION);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 22d15d9..778e1b3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -33,7 +34,9 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
@@ -61,15 +64,15 @@ public class FieldEncoderFactory {
    *
    * @param dataField             column schema
    * @param cache                 dicionary cache.
-   * @param carbonTableIdentifier table identifier
+   * @param absoluteTableIdentifier table identifier
    * @param index                 index of column in the row.
    * @param isEmptyBadRecord
    * @return
    */
   public FieldConverter createFieldEncoder(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
-      DictionaryClient client, Boolean useOnePass, String storePath,
+      AbsoluteTableIdentifier absoluteTableIdentifier, int index, String nullFormat,
+      DictionaryClient client, Boolean useOnePass,
       Map<Object, Integer> localCache, boolean isEmptyBadRecord)
       throws IOException {
     // Converters are only needed for dimensions and measures it return null.
@@ -85,11 +88,11 @@ public class FieldEncoderFactory {
         // in case of child table it will use parent table dictionary
         if (null == dataField.getColumn().getColumnSchema().getParentColumnTableRelations()
             || dataField.getColumn().getColumnSchema().getParentColumnTableRelations().isEmpty()) {
-          identifier = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
+          identifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
               dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType(),
-              CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier));
-          return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier,
-              nullFormat, index, client, useOnePass, storePath, localCache, isEmptyBadRecord,
+              CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier));
+          return new DictionaryFieldConverterImpl(dataField, cache, absoluteTableIdentifier,
+              nullFormat, index, client, useOnePass, localCache, isEmptyBadRecord,
               identifier);
         } else {
           ParentColumnTableRelation parentColumnTableRelation =
@@ -103,17 +106,21 @@ public class FieldEncoderFactory {
           ColumnIdentifier parentColumnIdentifier =
               new ColumnIdentifier(parentColumnTableRelation.getColumnId(), null,
                   dataField.getColumn().getDataType());
-          identifier =
-              new DictionaryColumnUniqueIdentifier(parentTableIdentifier, parentColumnIdentifier,
-                  dataField.getColumn().getDataType(),
-                  CarbonStorePath.getCarbonTablePath(storePath, parentTableIdentifier));
-          return new DictionaryFieldConverterImpl(dataField, cache, parentTableIdentifier,
-              nullFormat, index, null, false, storePath, null, isEmptyBadRecord, identifier);
+          CarbonTablePath carbonTablePath =
+              CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
+          AbsoluteTableIdentifier parentAbsoluteTableIdentifier = new AbsoluteTableIdentifier(
+              CarbonUtil.getNewTablePath(carbonTablePath, parentTableIdentifier),
+              parentTableIdentifier);
+          identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier,
+              parentColumnIdentifier, dataField.getColumn().getDataType(),
+              CarbonStorePath.getCarbonTablePath(parentAbsoluteTableIdentifier));
+          return new DictionaryFieldConverterImpl(dataField, cache, parentAbsoluteTableIdentifier,
+              nullFormat, index, null, false, null, isEmptyBadRecord, identifier);
         }
       } else if (dataField.getColumn().isComplex()) {
         return new ComplexFieldConverterImpl(
-            createComplexType(dataField, cache, carbonTableIdentifier,
-                client, useOnePass, storePath, localCache), index);
+            createComplexType(dataField, cache, absoluteTableIdentifier,
+                client, useOnePass, localCache), index);
       } else {
         return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
       }
@@ -127,10 +134,10 @@ public class FieldEncoderFactory {
    */
   private static GenericDataType createComplexType(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
-      String storePath, Map<Object, Integer> localCache) {
+      AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
+      Map<Object, Integer> localCache) {
     return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
-        carbonTableIdentifier, client, useOnePass, storePath, localCache);
+        absoluteTableIdentifier, client, useOnePass, localCache);
   }
 
   /**
@@ -140,8 +147,8 @@ public class FieldEncoderFactory {
    */
   private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
-      String storePath, Map<Object, Integer> localCache) {
+      AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
+      Map<Object, Integer> localCache) {
     DataType dataType = carbonColumn.getDataType();
     if (DataTypes.isArrayType(dataType)) {
       List<CarbonDimension> listOfChildDimensions =
@@ -151,8 +158,8 @@ public class FieldEncoderFactory {
           new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
       for (CarbonDimension dimension : listOfChildDimensions) {
         arrayDataType.addChildren(
-            createComplexType(dimension, carbonColumn.getColName(), cache, carbonTableIdentifier,
-                client, useOnePass, storePath, localCache));
+            createComplexType(dimension, carbonColumn.getColName(), cache, absoluteTableIdentifier,
+                client, useOnePass, localCache));
       }
       return arrayDataType;
     } else if (DataTypes.isStructType(dataType)) {
@@ -163,16 +170,16 @@ public class FieldEncoderFactory {
           new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
       for (CarbonDimension dimension : dimensions) {
         structDataType.addChildren(
-            createComplexType(dimension, carbonColumn.getColName(), cache, carbonTableIdentifier,
-                client, useOnePass, storePath, localCache));
+            createComplexType(dimension, carbonColumn.getColName(), cache, absoluteTableIdentifier,
+                client, useOnePass, localCache));
       }
       return structDataType;
     } else if (DataTypes.isMapType(dataType)) {
       throw new UnsupportedOperationException("Complex type Map is not supported yet");
     } else {
       return new PrimitiveDataType(carbonColumn.getColName(), parentName,
-          carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache, carbonTableIdentifier,
-          client, useOnePass, storePath, localCache);
+          carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
+          absoluteTableIdentifier, client, useOnePass, localCache);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index 79c6d61..16c4a22 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -85,8 +85,7 @@ public class RowConverterImpl implements RowConverter {
   @Override
   public void initialize() throws IOException {
     CacheProvider cacheProvider = CacheProvider.getInstance();
-    cache = cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
-        configuration.getTableIdentifier().getStorePath());
+    cache = cacheProvider.createCache(CacheType.REVERSE_DICTIONARY);
     String nullFormat =
         configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
             .toString();
@@ -102,10 +101,8 @@ public class RowConverterImpl implements RowConverter {
     for (int i = 0; i < fields.length; i++) {
       localCaches[i] = new ConcurrentHashMap<>();
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
-          .createFieldEncoder(fields[i], cache,
-              configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
-              configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
-              localCaches[i], isEmptyBadRecord);
+          .createFieldEncoder(fields[i], cache, configuration.getTableIdentifier(), i, nullFormat,
+              client, configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord);
       fieldConverterList.add(fieldConverter);
     }
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
@@ -210,10 +207,9 @@ public class RowConverterImpl implements RowConverter {
     for (int i = 0; i < fields.length; i++) {
       FieldConverter fieldConverter = null;
       try {
-        fieldConverter = FieldEncoderFactory.getInstance().createFieldEncoder(fields[i], cache,
-            configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
-            configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
-            localCaches[i], isEmptyBadRecord);
+        fieldConverter = FieldEncoderFactory.getInstance()
+            .createFieldEncoder(fields[i], cache, configuration.getTableIdentifier(), i, nullFormat,
+                client, configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index acd7fed..8c3fe56 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -43,7 +43,7 @@ public class CarbonLoadModel implements Serializable {
 
   private boolean aggLoadRequest;
 
-  private String storePath;
+  private String tablePath;
 
   private boolean isRetentionRequest;
 
@@ -354,7 +354,7 @@ public class CarbonLoadModel implements Serializable {
     copy.dateFormat = dateFormat;
     copy.defaultTimestampFormat = defaultTimestampFormat;
     copy.maxColumns = maxColumns;
-    copy.storePath = storePath;
+    copy.tablePath = tablePath;
     copy.useOnePass = useOnePass;
     copy.dictionaryServerHost = dictionaryServerHost;
     copy.dictionaryServerPort = dictionaryServerPort;
@@ -402,7 +402,7 @@ public class CarbonLoadModel implements Serializable {
     copy.dateFormat = dateFormat;
     copy.defaultTimestampFormat = defaultTimestampFormat;
     copy.maxColumns = maxColumns;
-    copy.storePath = storePath;
+    copy.tablePath = tablePath;
     copy.useOnePass = useOnePass;
     copy.dictionaryServerHost = dictionaryServerHost;
     copy.dictionaryServerPort = dictionaryServerPort;
@@ -452,7 +452,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.dateFormat = dateFormat;
     copyObj.defaultTimestampFormat = defaultTimestampFormat;
     copyObj.maxColumns = maxColumns;
-    copyObj.storePath = storePath;
+    copyObj.tablePath = tablePath;
     copyObj.useOnePass = useOnePass;
     copyObj.dictionaryServerHost = dictionaryServerHost;
     copyObj.dictionaryServerPort = dictionaryServerPort;
@@ -480,17 +480,17 @@ public class CarbonLoadModel implements Serializable {
   }
 
   /**
-   * @param storePath The storePath to set.
+   * @param tablePath The tablePath to set.
    */
-  public void setStorePath(String storePath) {
-    this.storePath = storePath;
+  public void setTablePath(String tablePath) {
+    this.tablePath = tablePath;
   }
 
   /**
    * @return Returns the factStoreLocation.
    */
-  public String getStorePath() {
-    return storePath;
+  public String getTablePath() {
+    return tablePath;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index aa77fb6..6da50a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -47,7 +47,7 @@ public abstract class AbstractResultProcessor {
     CarbonDataFileAttributes carbonDataFileAttributes;
     if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
       int taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
-          CarbonStorePath.getCarbonTablePath(loadModel.getStorePath(),
+          CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(),
               carbonTable.getCarbonTableIdentifier()));
       // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
       // be written in same segment. So the TaskNo should be incremented by 1 from max val.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index c2ee1bc..c1df349 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -171,9 +171,7 @@ public final class CarbonDataMergerUtil {
     AbsoluteTableIdentifier absoluteTableIdentifier =
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
 
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
+    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
 
     SegmentUpdateStatusManager segmentUpdateStatusManager =
         new SegmentUpdateStatusManager(absoluteTableIdentifier);
@@ -298,8 +296,7 @@ public final class CarbonDataMergerUtil {
             + carbonLoadModel.getTableName() + " for table status updation ");
 
         CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                absoluteTableIdentifier.getCarbonTableIdentifier());
+            .getCarbonTablePath(absoluteTableIdentifier);
 
         String statusFilePath = carbonTablePath.getTableStatusFilePath();
 
@@ -388,7 +385,7 @@ public final class CarbonDataMergerUtil {
   public static List<LoadMetadataDetails> identifySegmentsToBeMerged(
       CarbonLoadModel carbonLoadModel, long compactionSize,
       List<LoadMetadataDetails> segments, CompactionType compactionType) {
-    String storeLocation = carbonLoadModel.getStorePath();
+    String tablePath = carbonLoadModel.getTablePath();
     List<LoadMetadataDetails> sortedSegments = new ArrayList<LoadMetadataDetails>(segments);
 
     sortSegments(sortedSegments);
@@ -413,7 +410,7 @@ public final class CarbonDataMergerUtil {
     if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
 
       listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
-          listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation);
+          listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, tablePath);
     } else {
 
       listOfSegmentsToBeMerged =
@@ -580,7 +577,7 @@ public final class CarbonDataMergerUtil {
    */
   private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
       long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
-      CarbonLoadModel carbonLoadModel, String storeLocation) {
+      CarbonLoadModel carbonLoadModel, String tablePath) {
 
     List<LoadMetadataDetails> segmentsToBeMerged =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -603,7 +600,7 @@ public final class CarbonDataMergerUtil {
       String segId = segment.getLoadName();
       // variable to store one  segment size across partition.
       long sizeOfOneSegmentAcrossPartition =
-          getSizeOfSegment(storeLocation, tableIdentifier, segId);
+          getSizeOfSegment(tablePath, tableIdentifier, segId);
 
       // if size of a segment is greater than the Major compaction size. then ignore it.
       if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
@@ -646,9 +643,9 @@ public final class CarbonDataMergerUtil {
    * @param segId segment id
    * @return the data size of the segment
    */
-  private static long getSizeOfSegment(String storePath,
+  private static long getSizeOfSegment(String tablePath,
       CarbonTableIdentifier tableIdentifier, String segId) {
-    String loadPath = getStoreLocation(storePath, tableIdentifier, segId);
+    String loadPath = getStoreLocation(tablePath, tableIdentifier, segId);
     CarbonFile segmentFolder =
         FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
     return getSizeOfFactFileInLoad(segmentFolder);
@@ -657,15 +654,15 @@ public final class CarbonDataMergerUtil {
   /**
    * This method will get the store location for the given path, segemnt id and partition id
    *
-   * @param storePath the store path of the segment
+   * @param tablePath
    * @param carbonTableIdentifier identifier of catbon table that the segment belong to
    * @param segmentId segment id
    * @return the store location of the segment
    */
-  private static String getStoreLocation(String storePath,
+  private static String getStoreLocation(String tablePath,
       CarbonTableIdentifier carbonTableIdentifier, String segmentId) {
     CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier);
+        CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier);
     return carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
   }
 
@@ -1001,9 +998,7 @@ public final class CarbonDataMergerUtil {
     CarbonFile[] updateDeltaFiles = null;
     Set<String> uniqueBlocks = new HashSet<String>();
 
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
+    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
 
     String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg);
     CarbonFile segDir =
@@ -1255,8 +1250,7 @@ public final class CarbonDataMergerUtil {
     AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
 
     CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                    absoluteTableIdentifier.getCarbonTableIdentifier());
+            .getCarbonTablePath(absoluteTableIdentifier);
 
     String tableStatusPath = carbonTablePath.getTableStatusFilePath();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index b6ac19d..4f9458c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -34,7 +35,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -306,7 +306,7 @@ public class CarbonFactDataHandlerModel {
     }
     carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
     String carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .checkAndCreateCarbonStoreLocation(loadModel.getStorePath(), loadModel.getDatabaseName(),
+        .checkAndCreateCarbonStoreLocation(loadModel.getTablePath(), loadModel.getDatabaseName(),
             tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName);
@@ -331,17 +331,13 @@ public class CarbonFactDataHandlerModel {
    * @return data directory path
    */
   private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) {
-    String carbonStorePath =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS);
-    CarbonTableIdentifier tableIdentifier =
-        configuration.getTableIdentifier().getCarbonTableIdentifier();
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-        tableIdentifier.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + tableIdentifier
-            .getTableName());
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTable.getCarbonTableIdentifier());
-    return carbonTablePath.getCarbonDataDirectoryPath(configuration.getPartitionId(),
-        configuration.getSegmentId() + "");
+    AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
+    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
+    String carbonDataDirectoryPath = carbonTablePath
+        .getCarbonDataDirectoryPath(configuration.getPartitionId(),
+            configuration.getSegmentId() + "");
+    CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+    return carbonDataDirectoryPath;
   }
 
   public int[] getColCardinality() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 5b6998a..29a979d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -89,7 +89,7 @@ public final class CarbonLoaderUtil {
   public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
     CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
     CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+        .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
 
     for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
       String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "", currentLoad + "");
@@ -109,7 +109,7 @@ public final class CarbonLoaderUtil {
     CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema()
         .getCarbonTable();
     CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
-        loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+        loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
 
     int fileCount = 0;
     int partitionCount = carbonTable.getPartitionCount();
@@ -145,7 +145,7 @@ public final class CarbonLoaderUtil {
     String metaDataLocation = carbonTable.getMetaDataFilepath();
     final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
     CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+        .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
 
     //delete folder which metadata no exist in tablestatus
     for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
@@ -262,9 +262,7 @@ public final class CarbonLoaderUtil {
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
     AbsoluteTableIdentifier absoluteTableIdentifier =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
+    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
     String tableStatusPath = carbonTablePath.getTableStatusFilePath();
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -382,10 +380,10 @@ public final class CarbonLoaderUtil {
     loadMetadataDetails.setLoadStartTime(loadStartTime);
   }
 
-  public static void writeLoadMetadata(String storeLocation, String dbName, String tableName,
+  public static void writeLoadMetadata(AbsoluteTableIdentifier absoluteTableIdentifier,
       List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
     CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(storeLocation, dbName, tableName);
+        CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
     String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
 
     DataOutputStream dataOutputStream;
@@ -427,20 +425,19 @@ public final class CarbonLoaderUtil {
     return date;
   }
 
-  public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier,
-      String carbonStorePath) throws IOException {
+  public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier)
+      throws IOException {
     Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =
-        CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, carbonStorePath);
+        CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY);
     return dictCache.get(columnIdentifier);
   }
 
-  public static Dictionary getDictionary(CarbonTableIdentifier tableIdentifier,
-      ColumnIdentifier columnIdentifier, String carbonStorePath, DataType dataType)
+  public static Dictionary getDictionary(AbsoluteTableIdentifier absoluteTableIdentifier,
+      ColumnIdentifier columnIdentifier, DataType dataType)
       throws IOException {
     return getDictionary(
-        new DictionaryColumnUniqueIdentifier(tableIdentifier, columnIdentifier, dataType,
-            CarbonStorePath.getCarbonTablePath(carbonStorePath, tableIdentifier)),
-        carbonStorePath);
+        new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, columnIdentifier, dataType,
+            CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index ffdabce..09e7b47 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -24,7 +24,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
@@ -43,17 +43,14 @@ public final class DeleteLoadFolders {
   /**
    * returns segment path
    *
-   * @param dbName
-   * @param tableName
-   * @param storeLocation
+   * @param absoluteTableIdentifier
    * @param partitionId
    * @param oneLoad
    * @return
    */
-  private static String getSegmentPath(String dbName, String tableName, String storeLocation,
+  private static String getSegmentPath(AbsoluteTableIdentifier absoluteTableIdentifier,
       int partitionId, LoadMetadataDetails oneLoad) {
-    CarbonTablePath carbon = new CarbonStorePath(storeLocation).getCarbonTablePath(
-        new CarbonTableIdentifier(dbName, tableName, ""));
+    CarbonTablePath carbon = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
     String segmentId = oneLoad.getLoadName();
     return carbon.getCarbonDataDirectoryPath("" + partitionId, segmentId);
   }
@@ -125,15 +122,15 @@ public final class DeleteLoadFolders {
     return false;
   }
 
-  public static boolean deleteLoadFoldersFromFileSystem(String dbName, String tableName,
-      String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
-
+  public static boolean deleteLoadFoldersFromFileSystem(
+      AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
+      LoadMetadataDetails[] details) {
     boolean isDeleted = false;
 
     if (details != null && details.length != 0) {
       for (LoadMetadataDetails oneLoad : details) {
         if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
-          String path = getSegmentPath(dbName, tableName, storeLocation, 0, oneLoad);
+          String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
           boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
           if (deletionStatus) {
             isDeleted = true;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index d15b45c..485e718 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -63,7 +63,7 @@ public class BlockIndexStoreTest extends TestCase {
     CarbonProperties.getInstance().
         addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10");
     CacheProvider cacheProvider = CacheProvider.getInstance();
-    cache = (BlockIndexStore) cacheProvider.createCache(CacheType.EXECUTOR_BTREE, "");
+    cache = (BlockIndexStore) cacheProvider.createCache(CacheType.EXECUTOR_BTREE);
   }
 
   @AfterClass public void tearDown() {
@@ -258,9 +258,7 @@ public class BlockIndexStoreTest extends TestCase {
   }
 
   private static File getPartFile() {
-    String path = StoreCreator.getAbsoluteTableIdentifier().getStorePath() + "/" + StoreCreator
-        .getAbsoluteTableIdentifier().getCarbonTableIdentifier().getDatabaseName() + "/"
-        + StoreCreator.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableName()
+    String path = StoreCreator.getAbsoluteTableIdentifier().getTablePath()
         + "/Fact/Part0/Segment_0";
     File file = new File(path);
     File[] files = file.listFiles();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java b/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
index ddc8657..ba77f29 100644
--- a/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
+++ b/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.lcm.locks;
 
 import java.io.File;
 
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.locks.LocalFileLock;
 import org.apache.carbondata.core.locks.LockUsage;
@@ -51,13 +52,15 @@ public class LocalFileLockTest {
 
   @Test public void testingLocalFileLockingByAcquiring2Locks() {
 
-	CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("databaseName", "tableName", "tableId");
+    AbsoluteTableIdentifier absoluteTableIdentifier = AbsoluteTableIdentifier
+        .from(CarbonProperties.getInstance().getProperty("carbon.storelocation"), "databaseName",
+            "tableName");
     LocalFileLock localLock1 =
-        new LocalFileLock(carbonTableIdentifier,
+        new LocalFileLock(absoluteTableIdentifier,
             LockUsage.METADATA_LOCK);
     Assert.assertTrue(localLock1.lock());
     LocalFileLock localLock2 =
-        new LocalFileLock(carbonTableIdentifier,
+        new LocalFileLock(absoluteTableIdentifier,
             LockUsage.METADATA_LOCK);
     Assert.assertTrue(!localLock2.lock());
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java b/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
index 757f2e1..e0b6d66 100644
--- a/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
+++ b/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
@@ -17,6 +17,9 @@
 package org.apache.carbondata.lcm.locks;
 
 import mockit.NonStrictExpectations;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.locks.LockUsage;
@@ -95,7 +98,9 @@ public class ZooKeeperLockingTest {
 
     ZookeeperInit zki = ZookeeperInit.getInstance("127.0.0.1:" + freePort);
 
-    CarbonTableIdentifier tableIdentifier = new CarbonTableIdentifier("dbName", "tableName", "tableId");
+    AbsoluteTableIdentifier tableIdentifier = AbsoluteTableIdentifier
+        .from(CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION),
+            "dbName", "tableName");
     ZooKeeperLocking zkl =
         new ZooKeeperLocking(tableIdentifier,
             LockUsage.METADATA_LOCK);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 0f919ab..58cc019 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -99,14 +99,15 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 public class StoreCreator {
 
   private static AbsoluteTableIdentifier absoluteTableIdentifier;
-
+  private static String storePath = "";
   static {
     try {
-      String storePath = new File("target/store").getCanonicalPath();
+      storePath = new File("target/store").getCanonicalPath();
       String dbName = "testdb";
       String tableName = "testtable";
       absoluteTableIdentifier =
-          new AbsoluteTableIdentifier(storePath, new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+          new AbsoluteTableIdentifier(storePath + "/testdb/testtable",
+              new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
     } catch (IOException ex) {
 
     }
@@ -122,10 +123,10 @@ public class StoreCreator {
   public static void createCarbonStore() {
     try {
       String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
-      File storeDir = new File(absoluteTableIdentifier.getStorePath());
+      File storeDir = new File(storePath);
       CarbonUtil.deleteFoldersAndFiles(storeDir);
       CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
-          absoluteTableIdentifier.getStorePath());
+          storePath);
 
       CarbonTable table = createTable();
       writeDictionary(factFilePath, table);
@@ -137,7 +138,7 @@ public class StoreCreator {
       loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
       loadModel.setFactFilePath(factFilePath);
       loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
-      loadModel.setStorePath(absoluteTableIdentifier.getStorePath());
+      loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
       loadModel.setDateFormat(null);
       loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
           CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -165,7 +166,7 @@ public class StoreCreator {
       loadModel.setFactTimeStamp(System.currentTimeMillis());
       loadModel.setMaxColumns("10");
 
-      loadData(loadModel, absoluteTableIdentifier.getStorePath());
+      loadData(loadModel, storePath);
 
     } catch (Exception e) {
       e.printStackTrace();
@@ -174,7 +175,7 @@ public class StoreCreator {
 
   private static CarbonTable createTable() throws IOException {
     TableInfo tableInfo = new TableInfo();
-    tableInfo.setStorePath(absoluteTableIdentifier.getStorePath());
+    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
     tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
     TableSchema tableSchema = new TableSchema();
     tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
@@ -263,7 +264,7 @@ public class StoreCreator {
     tableInfo.setFactTable(tableSchema);
 
     CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+        .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
             absoluteTableIdentifier.getCarbonTableIdentifier());
     String schemaFilePath = carbonTablePath.getSchemaFilePath();
     String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
@@ -314,21 +315,24 @@ public class StoreCreator {
     }
 
     Cache dictCache = CacheProvider.getInstance()
-        .createCache(CacheType.REVERSE_DICTIONARY, absoluteTableIdentifier.getStorePath());
+        .createCache(CacheType.REVERSE_DICTIONARY);
     for (int i = 0; i < set.length; i++) {
-      ColumnIdentifier columnIdentifier = new ColumnIdentifier(dims.get(i).getColumnId(), null, null);
-      DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(table.getCarbonTableIdentifier(), columnIdentifier, columnIdentifier.getDataType(),
-          CarbonStorePath.getCarbonTablePath(table.getStorePath(), table.getCarbonTableIdentifier()));
+      ColumnIdentifier columnIdentifier =
+          new ColumnIdentifier(dims.get(i).getColumnId(), null, null);
+      DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
+          new DictionaryColumnUniqueIdentifier(table.getAbsoluteTableIdentifier(), columnIdentifier,
+              columnIdentifier.getDataType(), CarbonStorePath
+              .getCarbonTablePath(table.getAbsoluteTableIdentifier().getTablePath(),
+                  table.getCarbonTableIdentifier()));
       CarbonDictionaryWriter writer =
-          new CarbonDictionaryWriterImpl(absoluteTableIdentifier.getStorePath(),
-              absoluteTableIdentifier.getCarbonTableIdentifier(), dictionaryColumnUniqueIdentifier);
+          new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
       for (String value : set[i]) {
         writer.write(value);
       }
       writer.close();
       writer.commit();
       Dictionary dict = (Dictionary) dictCache.get(
-          new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(),
+          new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
         		  columnIdentifier, dims.get(i).getDataType(),
               CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)));
       CarbonDictionarySortInfoPreparator preparator =
@@ -337,9 +341,7 @@ public class StoreCreator {
       CarbonDictionarySortInfo dictionarySortInfo =
           preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType());
       CarbonDictionarySortIndexWriter carbonDictionaryWriter =
-          new CarbonDictionarySortIndexWriterImpl(
-              absoluteTableIdentifier.getCarbonTableIdentifier(), dictionaryColumnUniqueIdentifier,
-              absoluteTableIdentifier.getStorePath());
+          new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier);
       try {
         carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex());
         carbonDictionaryWriter.writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted());
@@ -405,7 +407,7 @@ public class StoreCreator {
         format.createRecordReader(blockDetails, hadoopAttemptContext);
 
     CSVRecordReaderIterator readerIterator = new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext);
-    String[] storeLocationArray = new String[] {storeLocation};
+    String[] storeLocationArray = new String[] {storeLocation + "/" + databaseName + "/" + tableName};
     new DataLoadExecutor().execute(loadModel,
         storeLocationArray,
         new CarbonIterator[]{readerIterator});
@@ -483,7 +485,7 @@ public class StoreCreator {
   }
 
   public static String readCurrentTime() {
-    SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+    SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
     String date = null;
 
     date = sdf.format(new Date());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
index 80936d1..6ee3296 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
@@ -38,7 +38,7 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi
       LOGGER.info("Carbon streaming query started: " + event.id)
       val sink = qry.sink.asInstanceOf[CarbonAppendableStreamSink]
       val carbonTable = sink.carbonTable
-      val lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getCarbonTableIdentifier,
+      val lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
         LockUsage.STREAMING_LOCK)
       if (lock.lockWithRetries()) {
         LOGGER.info("Acquired the lock for stream table: " + carbonTable.getDatabaseName + "." +