You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 03:28:40 UTC

[20/49] carbondata git commit: [CARBONDATA-2025] Unify all path construction through CarbonTablePath static method

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index c8a6b1d..184bf1b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.OperationContext
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
@@ -74,7 +74,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
           "true")
         loadCommand.processData(sqlContext.sparkSession)
         val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata(
-          carbonTable.getMetaDataFilepath, uuid)
+          carbonTable.getMetadataPath, uuid)
         val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect {
           case load if loadMetaDataDetails.contains(load) =>
             load.setMergedLoadName(mergedLoadName)
@@ -83,11 +83,8 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
             load
           case other => other
         }
-        val carbonTablePath = CarbonStorePath
-          .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-            .getAbsoluteTableIdentifier)
-        SegmentStatusManager
-          .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePathWithUUID(uuid),
+        SegmentStatusManager.writeLoadDetailsIntoFile(
+          CarbonTablePath.getTableStatusFilePathWithUUID(uuid),
             updatedLoadMetaDataDetails)
         carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava)
       } finally {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 6f08154..8d3110a 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -59,7 +59,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
@@ -72,7 +72,7 @@ import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonData
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _}
 import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, Util}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
 /**
  * This is the factory class which can create different RDD depends on user needs.
@@ -127,7 +127,7 @@ object CarbonDataRDDFactory {
       LOGGER.error("Not able to acquire the compaction lock for table " +
           s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       CarbonCompactionUtil
-          .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
+          .createCompactionRequiredFile(carbonTable.getMetadataPath, compactionType)
       // throw exception only in case of DDL trigger.
       if (compactionModel.isDDLTrigger) {
         CarbonException.analysisException(
@@ -195,7 +195,7 @@ object CarbonDataRDDFactory {
                   s"${ tableForCompaction.getDatabaseName }." +
                   s"${ tableForCompaction.getTableName}")
               val table: CarbonTable = tableForCompaction
-              val metadataPath = table.getMetaDataFilepath
+              val metadataPath = table.getMetadataPath
               val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
 
               val newCarbonLoadModel = prepareCarbonLoadModel(table)
@@ -577,15 +577,13 @@ object CarbonDataRDDFactory {
         (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)))
 
       val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
-        carbonTable.getMetaDataFilepath)
+        carbonTable.getMetadataPath)
         .filter(lmd => lmd.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS) ||
                        lmd.getSegmentStatus.equals(SegmentStatus.SUCCESS))
       val segmentIds = loadMetadataDetails.map(_.getLoadName)
       val segmentIdIndex = segmentIds.zipWithIndex.toMap
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getTablePath,
-        carbonTable.getCarbonTableIdentifier)
       val segmentId2maxTaskNo = segmentIds.map { segId =>
-        (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath))
+        (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonLoadModel.getTablePath))
       }.toMap
 
       class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 07acaa5..e1bef9c 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -136,7 +136,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     val validSegments: List[Segment] = CarbonDataMergerUtil.getValidSegments(loadsToMerge)
     val carbonMergerMapping = CarbonMergerMapping(
       tablePath,
-      carbonTable.getMetaDataFilepath,
+      carbonTable.getMetadataPath,
       mergedLoadName,
       databaseName,
       factTableName,
@@ -148,7 +148,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       currentPartitions = partitions)
     carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
     carbonLoadModel.setLoadMetadataDetails(
-      SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
+      SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath).toList.asJava)
     // trigger event for compaction
     val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
       AlterTableCompactionPreEvent(sqlContext.sparkSession,
@@ -234,11 +234,11 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
         ((compactionType == CompactionType.IUD_UPDDEL_DELTA) &&
          CarbonDataMergerUtil
            .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
-             carbonTable.getMetaDataFilepath,
+             carbonTable.getMetadataPath,
              carbonLoadModel)) ||
         CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus(
           loadsToMerge,
-            carbonTable.getMetaDataFilepath,
+            carbonTable.getMetadataPath,
             mergedLoadNumber,
           carbonLoadModel,
           compactionType,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 7e3b699..4645f98 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -34,12 +34,12 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events._
 import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
@@ -292,9 +292,10 @@ case class CarbonAlterTableCompactionCommand(
           true)(sparkSession,
           sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
         // 5. remove checkpoint
-        val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-        FileFactory.deleteAllFilesOfDir(new File(tablePath.getStreamingCheckpointDir))
-        FileFactory.deleteAllFilesOfDir(new File(tablePath.getStreamingLogDir))
+        FileFactory.deleteAllFilesOfDir(
+          new File(CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)))
+        FileFactory.deleteAllFilesOfDir(
+          new File(CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)))
       } else {
         val msg = "Failed to close streaming table, because streaming is locked for table " +
                   carbonTable.getDatabaseName() + "." + carbonTable.getTableName()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 7800d3e..5dbd383 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -60,7 +60,9 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
@@ -258,8 +260,7 @@ case class CarbonLoadDataCommand(
           carbonLoadModel.setUseOnePass(false)
         }
         // Create table and metadata folders if not exist
-        val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
-        val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+        val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
         val fileType = FileFactory.getFileType(metadataDirectoryPath)
         if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
           FileFactory.mkdirs(metadataDirectoryPath, fileType)
@@ -354,9 +355,7 @@ case class CarbonLoadDataCommand(
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       .getCarbonTableIdentifier
-    val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier)
-    val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
+    val dictFolderPath = CarbonTablePath.getMetadataPath(carbonLoadModel.getTablePath)
     val dimensions = carbonTable.getDimensionByTableName(
       carbonTable.getTableName).asScala.toArray
     val colDictFilePath = carbonLoadModel.getColDictFilePath
@@ -1035,4 +1034,5 @@ case class CarbonLoadDataCommand(
     val dataFrameWithTupleId = dataFrame.get.select(fieldWithTupleId: _*)
     (dataFrameWithTupleId)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index f8f215f..1e5885e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -45,7 +45,7 @@ case class CarbonShowLoadsCommand(
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     CarbonStore.showSegments(
       limit,
-      carbonTable.getMetaDataFilepath
+      carbonTable.getMetadataPath
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index e3e4c7a..a8316e9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -36,7 +36,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
 import org.apache.carbondata.hadoop.util.SchemaReader
 
@@ -63,19 +63,18 @@ case class RefreshCarbonTableCommand(
     // 2.2.1 validate that all the aggregate tables are copied at the store location.
     // 2.2.2 Register the aggregate tables
     val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
-    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName)
+    val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName)
     // 2.1 check if the table already register with hive then ignore and continue with the next
     // schema
     if (!sparkSession.sessionState.catalog.listTables(databaseName)
       .exists(_.table.equalsIgnoreCase(tableName))) {
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
       // check the existence of the schema file to know its a carbon table
-      val schemaFilePath = carbonTablePath.getSchemaFilePath
+      val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
       // if schema file does not exist then the table will either non carbon table or stale
       // carbon table
       if (FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))) {
         // read TableInfo
-        val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier)
+        val tableInfo = SchemaReader.getTableInfo(identifier)
         // 2.2 register the table with the hive check if the table being registered has
         // aggregate table then do the below steps
         // 2.2.1 validate that all the aggregate tables are copied at the store location.
@@ -99,7 +98,7 @@ case class RefreshCarbonTableCommand(
         // Register partitions to hive metastore in case of hive partitioning carbon table
         if (tableInfo.getFactTable.getPartitionInfo != null &&
             tableInfo.getFactTable.getPartitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
-          registerAllPartitionsToHive(absoluteTableIdentifier, sparkSession)
+          registerAllPartitionsToHive(identifier, sparkSession)
         }
       } else {
         LOGGER.audit(
@@ -178,9 +177,7 @@ case class RefreshCarbonTableCommand(
     dataMapSchemaList.asScala.foreach(dataMap => {
       val tableName = dataMap.getChildSchema.getTableName
       val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath,
-        new CarbonTableIdentifier(dbName, tableName, dataMap.getChildSchema.getTableId))
-      val schemaFilePath = carbonTablePath.getSchemaFilePath
+      val schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath)
       try {
         fileExist = FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))
       } catch {
@@ -191,7 +188,7 @@ case class RefreshCarbonTableCommand(
         return fileExist;
       }
     })
-    return true
+    true
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 25d5e91..10d55ef 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
 import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.processing.exception.MultipleMatchingException
@@ -68,12 +68,11 @@ object DeleteExecution {
     val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
     val isPartitionTable = carbonTable.isHivePartitionTable
     val factPath = if (isPartitionTable) {
-      carbonTablePath.getPath
+      absoluteTableIdentifier.getTablePath
     } else {
-      carbonTablePath.getFactDir
+      CarbonTablePath.getFactDir(absoluteTableIdentifier.getTablePath)
     }
     var segmentsTobeDeleted = Seq.empty[Segment]
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 38ac58e..c1f86ef 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -39,7 +39,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.spark.partition.DropPartitionCallable
@@ -65,8 +64,8 @@ case class CarbonAlterTableDropPartitionCommand(
     if (relation == null || CarbonMetadata.getInstance.getCarbonTable(dbName, tableName) == null) {
       throwMetadataException(dbName, tableName, "table not found")
     }
-    val table = relation.carbonTable
-    val partitionInfo = table.getPartitionInfo(tableName)
+    val carbonTable = relation.carbonTable
+    val partitionInfo = carbonTable.getPartitionInfo(tableName)
     if (partitionInfo == null) {
       throwMetadataException(dbName, tableName, "table is not a partition table")
     }
@@ -92,10 +91,9 @@ case class CarbonAlterTableDropPartitionCommand(
           "Dropping range interval partition is unsupported")
     }
     partitionInfo.dropPartition(partitionIndex)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
-    // read TableInfo
-    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
 
+    // read TableInfo
+    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl()
     val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
       dbName, tableName, tablePath)
@@ -108,11 +106,11 @@ case class CarbonAlterTableDropPartitionCommand(
     thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
       .setTime_stamp(System.currentTimeMillis)
     carbonMetaStore.updateTableSchemaForAlter(
-      table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-      table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
       thriftTable,
       null,
-      table.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+      carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     // update the schema modified time
     carbonMetaStore.updateAndTouchSchemasUpdatedTime()
     // sparkSession.catalog.refreshTable(tableName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 7aefbbe..3a5ed2e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -41,7 +41,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.spark.partition.SplitPartitionCallable
@@ -72,8 +71,8 @@ case class CarbonAlterTableSplitPartitionCommand(
       LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
       throwMetadataException(dbName, tableName, "table not found")
     }
-    val table = relation.carbonTable
-    val partitionInfo = table.getPartitionInfo(tableName)
+    val carbonTable = relation.carbonTable
+    val partitionInfo = carbonTable.getPartitionInfo(tableName)
     val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
     // keep a copy of partitionIdList before update partitionInfo.
     // will be used in partition data scan
@@ -88,9 +87,8 @@ case class CarbonAlterTableSplitPartitionCommand(
 
     updatePartitionInfo(partitionInfo, partitionIds)
 
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
     // read TableInfo
-    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl()
     val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
       dbName, tableName, tablePath)
@@ -100,12 +98,12 @@ case class CarbonAlterTableSplitPartitionCommand(
     wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
     val thriftTable =
       schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
-    carbonMetaStore
-      .updateTableSchemaForAlter(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        thriftTable,
-        null,
-        table.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+    carbonMetaStore.updateTableSchemaForAlter(
+      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+      thriftTable,
+      null,
+      carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     // update the schema modified time
     carbonMetaStore.updateAndTouchSchemasUpdatedTime()
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 0bee383..9df3241 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -16,11 +16,12 @@
  */
 package org.apache.spark.sql.execution.command.preaaggregate
 
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
-import org.apache.spark.sql._
-import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias, MatchCastExpression}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession, _}
+import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
+import org.apache.spark.sql.CarbonExpressions.MatchCastExpression
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSeq, Cast, Expression, ExprId, NamedExpression, ScalaUDF}
@@ -35,16 +36,12 @@ import org.apache.spark.sql.types.DataType
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.format.TableInfo
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -425,9 +422,7 @@ object PreAggregateUtil {
       locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
       // get the latest carbon table and check for column existence
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(
-        carbonTable.getAbsoluteTableIdentifier)
-      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
         thriftTableInfo,
@@ -516,6 +511,26 @@ object PreAggregateUtil {
     }
   }
 
+  /**
+   * This method reverts the changes to the schema if add column command fails.
+   *
+   * @param dbName
+   * @param tableName
+   * @param numberOfChildSchema
+   * @param sparkSession
+   */
+  def revertMainTableChanges(dbName: String, tableName: String, numberOfChildSchema: Int)
+    (sparkSession: SparkSession): Unit = {
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+    carbonTable.getTableLastUpdatedTime
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+    if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
+      metastore.revertTableSchemaForPreAggCreationFailure(
+        carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession)
+    }
+  }
+
   def getChildCarbonTable(databaseName: String, tableName: String)
     (sparkSession: SparkSession): Option[CarbonTable] = {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 4b43ea7..064ba18 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -28,7 +28,6 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{AlterTableAddColumnPostEvent, AlterTableAddColumnPreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.TableInfo
 import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}
@@ -64,9 +63,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener, operationContext)
       // get the latest carbon table and check for column existence
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter
         .fromExternalToWrapperTableInfo(thriftTableInfo,
@@ -76,8 +73,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
         dbName,
         wrapperTableInfo,
-        carbonTablePath,
-        carbonTable.getCarbonTableIdentifier,
+        carbonTable.getAbsoluteTableIdentifier,
         sparkSession.sparkContext).process
       // generate dictionary files for the newly added columns
       new AlterTableAddColumnRDD(sparkSession.sparkContext,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index 571e23f..f4077e6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -26,8 +26,9 @@ import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
@@ -74,9 +75,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
         throwMetadataException(dbName, tableName, s"Invalid Column: $columnName")
       }
       // read the latest schema file
-      val carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-      val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
       // maintain the added column for schema evolution history
       var addColumnSchema: ColumnSchema = null
       var deletedColumnSchema: ColumnSchema = null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 780ac8f..7bbefd7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -29,7 +29,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableDropColumnPostEvent, AlterTableDropColumnPreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.SchemaEvolutionEntry
 import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
@@ -99,10 +99,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPreEvent, operationContext)
 
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+        metastore.getThriftTableInfo(carbonTable)(sparkSession)
       // maintain the deleted columns for schema evolution history
       var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
       val columnSchemaList = tableInfo.fact_table.table_columns.asScala

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index c8f64e1..a55dbdd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.SchemaEvolutionEntry
 import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException}
@@ -95,8 +95,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
       val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
       // get the latest carbon table and check for column existence
-      val oldTablePath = CarbonStorePath.getCarbonTablePath(oldTableIdentifier)
-      val tableMetadataFile = oldTablePath.getPath
+      val tableMetadataFile = oldTableIdentifier.getTablePath
       val operationContext = new OperationContext
       // TODO: Pass new Table Path in pre-event.
       val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent(
@@ -106,7 +105,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
         sparkSession)
       OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
       val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(oldTablePath)(sparkSession)
+        metastore.getThriftTableInfo(carbonTable)(sparkSession)
       val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
       schemaEvolutionEntry.setTableName(newTableName)
       timeStamp = System.currentTimeMillis()
@@ -115,7 +114,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
       val fileType = FileFactory.getFileType(tableMetadataFile)
       val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
         newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
-      var newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
+      var newTablePath = CarbonTablePath.getNewTablePath(
+        oldTableIdentifier.getTablePath, newTableIdentifier.getTableName)
       metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
       val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
         .getClient()
@@ -130,9 +130,9 @@ private[sql] case class CarbonAlterTableRenameCommand(
       // changed the rename order to deal with situation when carbon table and hive table
       // will point to the same tablePath
       if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-        val rename = FileFactory.getCarbonFile(oldTablePath.getPath, fileType)
-          .renameForce(oldTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
-                       newTableName)
+        val rename = FileFactory.getCarbonFile(oldTableIdentifier.getTablePath, fileType)
+          .renameForce(
+            CarbonTablePath.getNewTablePath(oldTableIdentifier.getTablePath, newTableName))
         if (!rename) {
           renameBadRecords(newTableName, oldTableName, oldDatabaseName)
           sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
@@ -162,13 +162,11 @@ private[sql] case class CarbonAlterTableRenameCommand(
       case e: Exception =>
         LOGGER.error(e, "Rename table failed: " + e.getMessage)
         if (carbonTable != null) {
-          AlterTableUtil
-            .revertRenameTableChanges(oldTableIdentifier,
-              newTableName,
-              carbonTable.getTablePath,
-              carbonTable.getCarbonTableIdentifier.getTableId,
-              timeStamp)(
-              sparkSession)
+          AlterTableUtil.revertRenameTableChanges(
+            newTableName,
+            carbonTable,
+            timeStamp)(
+            sparkSession)
           renameBadRecords(newTableName, oldTableName, oldDatabaseName)
         }
         throwMetadataException(oldDatabaseName, oldTableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index b44dc7e..fd09e48 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -46,7 +46,8 @@ import org.apache.carbondata.core.metadata.schema
 import org.apache.carbondata.core.metadata.schema.table
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -209,11 +210,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = identifier.getCarbonTableIdentifier.getTableName
     val tablePath = identifier.getTablePath
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
-      tableName.toLowerCase(), UUID.randomUUID().toString)
-    val carbonTablePath =
-      CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
-    val tableMetadataFile = carbonTablePath.getSchemaFilePath
+    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
     val fileType = FileFactory.getFileType(tableMetadataFile)
     if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
       val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName)
@@ -240,13 +237,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
       tablePath: String) (sparkSession: SparkSession): String = {
-    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, oldTableIdentifier)
+    val identifier = AbsoluteTableIdentifier.from(tablePath, oldTableIdentifier)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
     }
-    val oldTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
-    val newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
+    val newTablePath = CarbonTablePath.getNewTablePath(
+      identifier.getTablePath, newTableIdentifier.getTableName)
     val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
       thriftTableInfo,
       newTableIdentifier.getDatabaseName,
@@ -341,8 +338,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
   private def createSchemaThriftFile(
       identifier: AbsoluteTableIdentifier,
       thriftTableInfo: TableInfo): String = {
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
+    val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
     val fileType = FileFactory.getFileType(schemaMetadataPath)
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
@@ -356,7 +352,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
     updateSchemasUpdatedTime(touchSchemaFileSystemTime())
-    carbonTablePath.getPath
+    identifier.getTablePath
   }
 
   protected def addTableCache(
@@ -431,8 +427,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     (sparkSession: SparkSession) {
     val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
-    val metadataFilePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
-      .getMetadataDirectoryPath
+    val metadataFilePath = CarbonTablePath.getMetadataPath(absoluteTableIdentifier.getTablePath)
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
@@ -528,9 +523,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
   override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
     metadata.carbonTables
 
-  override def getThriftTableInfo(tablePath: CarbonTablePath)
+  override def getThriftTableInfo(carbonTable: CarbonTable)
     (sparkSession: SparkSession): TableInfo = {
-    val tableMetadataFile = tablePath.getSchemaFilePath
+    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(carbonTable.getTablePath)
     CarbonUtil.readSchemaFile(tableMetadataFile)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 16ef38d..5e242b7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -28,7 +28,7 @@ import org.apache.carbondata.core.metadata.{schema, AbsoluteTableIdentifier, Car
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.format
 import org.apache.carbondata.format.SchemaEvolutionEntry
 import org.apache.carbondata.spark.util.CarbonSparkUtil
@@ -96,12 +96,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     Seq()
   }
 
-  override def getThriftTableInfo(tablePath: CarbonTablePath)
+  override def getThriftTableInfo(carbonTable: CarbonTable)
     (sparkSession: SparkSession): format.TableInfo = {
-    val identifier = tablePath.getCarbonTableIdentifier
-    val relation = lookupRelation(TableIdentifier(identifier.getTableName,
-      Some(identifier.getDatabaseName)))(sparkSession).asInstanceOf[CarbonRelation]
-    val carbonTable = relation.metaData.carbonTable
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo,
       carbonTable.getDatabaseName,
@@ -148,7 +144,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       carbonTablePath: String)(sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    updateHiveMetaStoreForAlter(newTableIdentifier,
+    updateHiveMetaStoreForAlter(
+      newTableIdentifier,
       oldTableIdentifier,
       thriftTableInfo,
       carbonTablePath,
@@ -163,7 +160,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       sparkSession: SparkSession,
       schemaConverter: ThriftWrapperSchemaConverterImpl) = {
     val newTablePath =
-      CarbonUtil.getNewTablePath(new Path(oldTablePath), newTableIdentifier.getTableName)
+      CarbonTablePath.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
     val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
       thriftTableInfo,
       newTableIdentifier.getDatabaseName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 93c7c09..0645040 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -143,7 +143,7 @@ trait CarbonMetaStore {
 
   def listAllTables(sparkSession: SparkSession): Seq[CarbonTable]
 
-  def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo
+  def getThriftTableInfo(carbonTable: CarbonTable)(sparkSession: SparkSession): TableInfo
 
   def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 7bf8536..c9833d0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 /**
  * Represents logical plan for one carbon table
@@ -212,10 +212,7 @@ case class CarbonRelation(
         .getValidAndInvalidSegments.getValidSegments.isEmpty) {
         sizeInBytesLocalValue = 0L
       } else {
-        val carbonTablePath = CarbonStorePath.getCarbonTablePath(
-          carbonTable.getTablePath,
-          carbonTable.getCarbonTableIdentifier)
-        val tablePath = carbonTablePath.getPath
+        val tablePath = carbonTable.getTablePath
         val fileType = FileFactory.getFileType(tablePath)
         if (FileFactory.isFileExist(tablePath, fileType)) {
           // get the valid segments

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 8ebd5a9..bc36e9c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -36,7 +36,8 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTable
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -176,41 +177,28 @@ object AlterTableUtil {
 
   /**
    * This method reverts the changes to the schema if the rename table command fails.
-   *
-   * @param oldTableIdentifier
-   * @param newTableName
-   * @param timeStamp
-   * @param sparkSession
    */
-  def revertRenameTableChanges(oldTableIdentifier: TableIdentifier,
+  def revertRenameTableChanges(
       newTableName: String,
-      tablePath: String,
-      tableId: String,
+      oldCarbonTable: CarbonTable,
       timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
-    val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val oldCarbonTableIdentifier = new CarbonTableIdentifier(database,
-      oldTableIdentifier.table, tableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, oldCarbonTableIdentifier)
+    val tablePath = oldCarbonTable.getTablePath
+    val tableId = oldCarbonTable.getCarbonTableIdentifier.getTableId
+    val oldCarbonTableIdentifier = oldCarbonTable.getCarbonTableIdentifier
+    val database = oldCarbonTable.getDatabaseName
     val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId)
-    val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newTableName)
+    val newTablePath = CarbonTablePath.getNewTablePath(tablePath, newTableName)
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val fileType = FileFactory.getFileType(tablePath)
     if (FileFactory.isFileExist(tablePath, fileType)) {
-      val tableInfo = if (metastore.isReadFromHiveMetaStore) {
-        // In case of hive metastore we first update the carbonschema inside old table only.
-        metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(tablePath,
-          new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)))(sparkSession)
-      } else {
-        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-      }
+      val tableInfo = metastore.getThriftTableInfo(oldCarbonTable)(sparkSession)
       val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
       val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
       if (updatedTime == timeStamp) {
-        LOGGER.error(s"Reverting changes for $database.${ oldTableIdentifier.table }")
-        FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
-          .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
-                       oldTableIdentifier.table)
+        LOGGER.error(s"Reverting changes for $database.${oldCarbonTable.getTableName}")
+        FileFactory.getCarbonFile(tablePath, fileType)
+          .renameForce(CarbonTablePath.getNewTablePath(tablePath, oldCarbonTable.getTableName))
         val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
           newTablePath,
           newCarbonTableIdentifier)
@@ -233,9 +221,7 @@ object AlterTableUtil {
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
-      carbonTable.getCarbonTableIdentifier)
-    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime == timeStamp) {
@@ -260,9 +246,7 @@ object AlterTableUtil {
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
-      carbonTable.getCarbonTableIdentifier)
-    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime == timeStamp) {
@@ -293,9 +277,7 @@ object AlterTableUtil {
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
-      carbonTable.getCarbonTableIdentifier)
-    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime == timeStamp) {
@@ -344,9 +326,7 @@ object AlterTableUtil {
       carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       // get the latest carbon table
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
-        carbonTable.getCarbonTableIdentifier)
-      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
         thriftTableInfo,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index aadee81..0bdef8a 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -856,9 +856,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   }
 
   def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = {
-    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
-      carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
     val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
       override def accept(file: CarbonFile): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index e3678cd..1d41ddc 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.util.TableOptionConstant
 
@@ -65,9 +65,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     carbonLoadModel.setCsvHeaderColumns(
       CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
     // Create table and metadata folders if not exist
-    val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier)
-    val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+    val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
     val fileType = FileFactory.getFileType(metadataDirectoryPath)
     if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
       FileFactory.mkdirs(metadataDirectoryPath, fileType)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index f9519f8..a465251 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index e543893..7ca0b56 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.util.TableOptionConstant
@@ -179,9 +179,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
       CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
     carbonLoadModel.setMaxColumns("100")
     // Create table and metadata folders if not exist
-    val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier)
-    val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+    val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
     val fileType = FileFactory.getFileType(metadataDirectoryPath)
     if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
       FileFactory.mkdirs(metadataDirectoryPath, fileType)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 5644302..017bca8 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -33,10 +33,14 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
@@ -197,7 +201,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier("batch_table", Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
       server = getServerSocket
@@ -205,7 +208,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       thread1.start()
       // use thread pool to catch the exception of sink thread
       val pool = Executors.newSingleThreadExecutor()
-      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier)
+      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier)
       val future = pool.submit(thread2)
       Thread.sleep(1000)
       thread1.interrupt()
@@ -225,11 +228,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier("stream_table_file", Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     val csvDataDir = new File("target/csvdata").getCanonicalPath
     // streaming ingest 10 rows
     generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
-    val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1,
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
       identifier)
     thread.start()
     Thread.sleep(2000)
@@ -1086,12 +1088,11 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier("stream_table_drop", Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
       server = getServerSocket
       val thread1 = createWriteSocketThread(server, 2, 10, 3)
-      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier, "force", 5, 1024L * 200, false)
+      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier, "force", 5, 1024L * 200, false)
       thread1.start()
       thread2.start()
       Thread.sleep(1000)
@@ -1200,7 +1201,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
   def createSocketStreamingThread(
       spark: SparkSession,
       port: Int,
-      tablePath: CarbonTablePath,
+      carbonTable: CarbonTable,
       tableIdentifier: TableIdentifier,
       badRecordAction: String = "force",
       intervalSecond: Int = 2,
@@ -1221,7 +1222,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
           qry = readSocketDF.writeStream
             .format("carbondata")
             .trigger(ProcessingTime(s"$intervalSecond seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
             .option("bad_records_action", badRecordAction)
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
@@ -1260,7 +1261,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier(tableName, Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
       server = getServerSocket()
@@ -1273,7 +1273,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       val thread2 = createSocketStreamingThread(
         spark = spark,
         port = server.getLocalPort,
-        tablePath = tablePath,
+        carbonTable = carbonTable,
         tableIdentifier = identifier,
         badRecordAction = badRecordAction,
         intervalSecond = intervalOfIngest,
@@ -1321,7 +1321,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
   def createFileStreamingThread(
       spark: SparkSession,
-      tablePath: CarbonTablePath,
+      carbonTable: CarbonTable,
       csvDataDir: String,
       intervalSecond: Int,
       tableIdentifier: TableIdentifier): Thread = {
@@ -1335,7 +1335,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
           qry = readSocketDF.writeStream
             .format("carbondata")
             .trigger(ProcessingTime(s"${ intervalSecond } seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
             .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 4d5f88c..27ed1bd 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -94,7 +94,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       }
       val carbonTable = CarbonMetadata.getInstance.getCarbonTable("default", "reverttest")
 
-      assert(new File(carbonTable.getMetaDataFilepath).listFiles().length < 6)
+      assert(new File(carbonTable.getMetadataPath).listFiles().length < 6)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index a8db6c9..bbc3697 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -56,43 +55,39 @@ public class TableProcessingOperations {
    */
   public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       final boolean isCompactionFlow) throws IOException {
-    String metaDataLocation = carbonTable.getMetaDataFilepath();
+    String metaDataLocation = carbonTable.getMetadataPath();
     final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier());
 
     //delete folder which metadata no exist in tablestatus
-    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
-      String partitionPath = carbonTablePath.getPartitionDir();
-      FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
-      if (FileFactory.isFileExist(partitionPath, fileType)) {
-        CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
-        CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-          @Override public boolean accept(CarbonFile path) {
-            String segmentId =
-                CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
-            boolean found = false;
-            for (int j = 0; j < details.length; j++) {
-              if (details[j].getLoadName().equals(segmentId)) {
-                found = true;
-                break;
-              }
-            }
-            return !found;
-          }
-        });
-        for (int k = 0; k < listFiles.length; k++) {
+    String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
+    FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
+    if (FileFactory.isFileExist(partitionPath, fileType)) {
+      CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
+      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile path) {
           String segmentId =
-              CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
-          if (isCompactionFlow) {
-            if (segmentId.contains(".")) {
-              CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
-            }
-          } else {
-            if (!segmentId.contains(".")) {
-              CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+              CarbonTablePath.DataFileUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
+          boolean found = false;
+          for (int j = 0; j < details.length; j++) {
+            if (details[j].getLoadName().equals(segmentId)) {
+              found = true;
+              break;
             }
           }
+          return !found;
+        }
+      });
+      for (int k = 0; k < listFiles.length; k++) {
+        String segmentId =
+            CarbonTablePath.DataFileUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
+        if (isCompactionFlow) {
+          if (segmentId.contains(".")) {
+            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+          }
+        } else {
+          if (!segmentId.contains(".")) {
+            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 4cd5014..193d192 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -34,8 +34,6 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -105,12 +103,11 @@ public class FieldEncoderFactory {
           ColumnIdentifier parentColumnIdentifier =
               new ColumnIdentifier(parentColumnTableRelation.getColumnId(), null,
                   dataField.getColumn().getDataType());
-          CarbonTablePath carbonTablePath =
-              CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
           AbsoluteTableIdentifier parentAbsoluteTableIdentifier =
               AbsoluteTableIdentifier.from(
-              CarbonUtil.getNewTablePath(carbonTablePath, parentTableIdentifier.getTableName()),
-              parentTableIdentifier);
+                  CarbonTablePath.getNewTablePath(
+                      absoluteTableIdentifier.getTablePath(), parentTableIdentifier.getTableName()),
+                  parentTableIdentifier);
           identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier,
               parentColumnIdentifier, dataField.getColumn().getDataType());
           return new DictionaryFieldConverterImpl(dataField, cache, parentAbsoluteTableIdentifier,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d453d4b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index d3caa99..a08177a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -19,10 +19,8 @@ package org.apache.carbondata.processing.merger;
 
 import java.util.List;
 
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
@@ -42,13 +40,11 @@ public abstract class AbstractResultProcessor {
   public abstract boolean execute(List<RawResultIterator> resultIteratorList);
 
   protected void setDataFileAttributesInModel(CarbonLoadModel loadModel,
-      CompactionType compactionType, CarbonTable carbonTable,
-      CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
+      CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
     CarbonDataFileAttributes carbonDataFileAttributes;
     if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
       long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
-          CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(),
-              carbonTable.getCarbonTableIdentifier()));
+          loadModel.getTablePath());
       // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
       // be written in same segment. So the TaskNo should be incremented by 1 from max val.
       long index = taskNo + 1;