You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/17 06:48:09 UTC

[2/3] carbondata git commit: [CARBONDATA-1739] Clean up store path interface

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index 822455c..64a066c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -47,9 +47,7 @@ case class CarbonDataMapShowCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     val schemaList = carbonTable.getTableInfo.getDataMapSchemaList
     if (schemaList != null && schemaList.size() > 0) {
       schemaList.asScala.map { s =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 66f2756..f34afbf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events._
 
 
@@ -60,12 +61,11 @@ case class CarbonDropDataMapCommand(
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
     val identifier = TableIdentifier(tableName, Option(dbName))
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
     val carbonEnv = CarbonEnv.getInstance(sparkSession)
     val catalog = carbonEnv.carbonMetastore
     val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonEnv.getInstance(sparkSession).storePath)
+      CarbonProperties.getStorePath)
     val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
     val tableIdentifier =
       AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
@@ -76,20 +76,19 @@ case class CarbonDropDataMapCommand(
         lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
       }
       LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]")
-      val carbonTable: Option[CarbonTable] =
-        catalog.getTableFromMetadataCache(dbName, tableName) match {
-          case Some(tableMeta) => Some(tableMeta.carbonTable)
-          case None => try {
-            Some(catalog.lookupRelation(identifier)(sparkSession)
-              .asInstanceOf[CarbonRelation].metaData.carbonTable)
-          } catch {
-            case ex: NoSuchTableException =>
-              if (!ifExistsSet) {
-                throw ex
-              }
-              None
-          }
+      var carbonTable: Option[CarbonTable] =
+        catalog.getTableFromMetadataCache(dbName, tableName)
+      if (carbonTable.isEmpty) {
+        try {
+          carbonTable = Some(catalog.lookupRelation(identifier)(sparkSession)
+            .asInstanceOf[CarbonRelation].metaData.carbonTable)
+        } catch {
+          case ex: NoSuchTableException =>
+            if (!ifExistsSet) {
+              throw ex
+            }
         }
+      }
       if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) {
         val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
           find(_._1.getDataMapName.equalsIgnoreCase(dataMapName))
@@ -144,7 +143,7 @@ case class CarbonDropDataMapCommand(
     // delete the table folder
     val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
     val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonEnv.getInstance(sparkSession).storePath)
+      CarbonProperties.getStorePath)
     val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
     val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
     DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 947cea1..2f04feb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -57,21 +57,21 @@ case class AlterTableCompactionCommand(
     if (relation == null) {
       sys.error(s"Table $databaseName.$tableName does not exist")
     }
-    if (null == relation.tableMeta.carbonTable) {
+    if (null == relation.carbonTable) {
       LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
       sys.error(s"alter table failed. table not found: $databaseName.$tableName")
     }
 
     val carbonLoadModel = new CarbonLoadModel()
 
-    val table = relation.tableMeta.carbonTable
-    carbonLoadModel.setTableName(table.getFactTableName)
+    val table = relation.carbonTable
+    carbonLoadModel.setTableName(table.getTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
     // Need to fill dimension relation
     carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-    carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
-    carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-    carbonLoadModel.setTablePath(relation.tableMeta.carbonTable.getTablePath)
+    carbonLoadModel.setTableName(relation.carbonTable.getTableName)
+    carbonLoadModel.setDatabaseName(relation.carbonTable.getDatabaseName)
+    carbonLoadModel.setTablePath(relation.carbonTable.getTablePath)
 
     var storeLocation = CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index 2003bb1..32d6b80 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -37,9 +37,7 @@ case class CarbonShowLoadsCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     CarbonStore.showSegments(
       GetDB.getDatabaseName(databaseNameOp, sparkSession),
       tableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
index 8b0dab7..58e33b7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, Runn
 import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
 
 case class CleanFilesCommand(
@@ -38,7 +39,7 @@ case class CleanFilesCommand(
     if (forceTableClean) {
       val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
       val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-        CarbonEnv.getInstance(sparkSession).storePath)
+        CarbonProperties.getStorePath)
       // TODO: TAABLEPATH
       CarbonStore.cleanFiles(
         dbName,
@@ -47,10 +48,7 @@ case class CleanFilesCommand(
         null,
         forceTableClean)
     } else {
-      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      val relation = catalog
-        .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
-      val carbonTable = relation.tableMeta.carbonTable
+      val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
       val cleanFilesPreEvent: CleanFilesPreEvent =
         CleanFilesPreEvent(carbonTable,
           sparkSession)
@@ -59,7 +57,7 @@ case class CleanFilesCommand(
       CarbonStore.cleanFiles(
         GetDB.getDatabaseName(databaseNameOp, sparkSession),
         tableName,
-        relation.asInstanceOf[CarbonRelation].tableMeta.storePath,
+        CarbonProperties.getStorePath,
         carbonTable,
         forceTableClean)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
index 6a0465c..5b305ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
@@ -35,9 +35,7 @@ case class DeleteLoadByIdCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     val operationContext = new OperationContext
 
     val deleteSegmentByIdPreEvent: DeleteSegmentByIdPreEvent =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
index 83f41bb..00c35a5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
@@ -37,9 +37,7 @@ case class DeleteLoadByLoadDateCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     val operationContext = new OperationContext
     val deleteSegmentByDatePreEvent: DeleteSegmentByDatePreEvent =
       DeleteSegmentByDatePreEvent(carbonTable,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
index 3f0e093..845a64c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
@@ -47,7 +47,7 @@ case class LoadTableByInsertCommand(
       Some(df)).run(sparkSession)
     // updating relation metadata. This is in case of auto detect high cardinality
     relation.carbonRelation.metaData =
-      CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable)
+      CarbonSparkUtil.createSparkMeta(relation.carbonRelation.carbonTable)
     load
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 777c169..0f4ca01 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.SegmentStatus
@@ -54,7 +55,8 @@ case class LoadTableCommand(
     isOverwriteTable: Boolean,
     var inputSqlString: String = null,
     dataFrame: Option[DataFrame] = None,
-    updateModel: Option[UpdateTableModel] = None)
+    updateModel: Option[UpdateTableModel] = None,
+    var tableInfoOp: Option[TableInfo] = None)
   extends RunnableCommand with DataProcessCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -72,16 +74,6 @@ case class LoadTableCommand(
     }
 
     val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-    if (null == relation.tableMeta.carbonTable) {
-      LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
-      LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
-      sys.error(s"Data loading failed. table not found: $dbName.$tableName")
-    }
 
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
@@ -105,18 +97,30 @@ case class LoadTableCommand(
     // update the property with new value
     carbonProperty.addProperty(CarbonCommonConstants.NUM_CORES_LOADING, numCoresLoading)
 
-    val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
-
-    val tableProperties = relation.tableMeta.carbonTable.getTableInfo
-      .getFactTable.getTableProperties
+    try {
+      val table = if (tableInfoOp.isDefined) {
+        CarbonTable.buildFromTableInfo(tableInfoOp.get)
+      } else {
+        val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        if (relation == null) {
+          sys.error(s"Table $dbName.$tableName does not exist")
+        }
+        if (null == relation.carbonTable) {
+          LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
+          LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
+          sys.error(s"Data loading failed. table not found: $dbName.$tableName")
+        }
+        relation.carbonTable
+      }
 
-    optionsFinal.put("sort_scope", tableProperties.getOrDefault("sort_scope",
-      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-        carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+      val tableProperties = table.getTableInfo.getFactTable.getTableProperties
+      val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+      optionsFinal.put("sort_scope", tableProperties.getOrDefault("sort_scope",
+        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+          carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+            CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
 
-    try {
-      val table = relation.tableMeta.carbonTable
       val carbonLoadModel = new CarbonLoadModel()
       val factPath = if (dataFrame.isDefined) {
         ""
@@ -137,11 +141,9 @@ case class LoadTableCommand(
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
         GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
-        val storePath = relation.tableMeta.storePath
         // add the start entry for the new load in the table status file
         if (updateModel.isEmpty) {
-          CommonUtil.
-            readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteTable)
+          CommonUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
         }
         if (isOverwriteTable) {
           LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
@@ -158,8 +160,7 @@ case class LoadTableCommand(
           carbonLoadModel.setUseOnePass(false)
         }
         // Create table and metadata folders if not exist
-        val carbonTablePath = CarbonStorePath
-          .getCarbonTablePath(storePath, table.getCarbonTableIdentifier)
+        val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
         val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
         val fileType = FileFactory.getFileType(metadataDirectoryPath)
         if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
@@ -192,9 +193,9 @@ case class LoadTableCommand(
       } finally {
         // Once the data load is successful delete the unwanted partition files
         try {
-          val partitionLocation = relation.tableMeta.storePath + "/partition/" +
+          val partitionLocation = CarbonProperties.getStorePath + "/partition/" +
                                   table.getDatabaseName + "/" +
-                                  table.getFactTableName + "/"
+                                  table.getTableName + "/"
           val fileType = FileFactory.getFileType(partitionLocation)
           if (FileFactory.isFileExist(partitionLocation, fileType)) {
             val file = FileFactory
@@ -234,7 +235,7 @@ case class LoadTableCommand(
       .getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier)
     val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
     val dimensions = carbonTable.getDimensionByTableName(
-      carbonTable.getFactTableName).asScala.toArray
+      carbonTable.getTableName).asScala.toArray
     val colDictFilePath = carbonLoadModel.getColDictFilePath
     if (!StringUtils.isEmpty(colDictFilePath)) {
       carbonLoadModel.initPredefDictMap()
@@ -378,8 +379,7 @@ case class LoadTableCommand(
     val identifier = model.table.getCarbonTableIdentifier
     // update CarbonDataLoadSchema
     val carbonTable = metastore.lookupRelation(Option(identifier.getDatabaseName),
-      identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
-      .carbonTable
+      identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].carbonTable
     carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index a52008a..efb6796 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -31,7 +31,6 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -64,24 +63,18 @@ object DeleteExecution {
       sparkSession: SparkSession,
       dataRdd: RDD[Row],
       timestamp: String,
-      relation: CarbonRelation,
       isUpdateOperation: Boolean,
-      executorErrors: ExecutionErrors
-  ): Boolean = {
+      executorErrors: ExecutionErrors): Boolean = {
 
     var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] = null
     val tableName = getTableIdentifier(identifier).table
     val database = GetDB.getDatabaseName(getTableIdentifier(identifier).database, sparkSession)
-    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
-      asInstanceOf[CarbonRelation]
-
-    val absoluteTableIdentifier = relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
+    val carbonTable = CarbonEnv.getCarbonTable(Some(database), tableName)(sparkSession)
+    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val carbonTablePath = CarbonStorePath
       .getCarbonTablePath(absoluteTableIdentifier)
     val factPath = carbonTablePath.getFactDir
 
-    val carbonTable = relation.tableMeta.carbonTable
     var deleteStatus = true
     val deleteRdd = if (isUpdateOperation) {
       val schema =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 34daf4e..6762489 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -54,7 +54,7 @@ object HorizontalCompaction {
     }
 
     var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
-    val carbonTable = carbonRelation.tableMeta.carbonTable
+    val carbonTable = carbonRelation.carbonTable
     val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val updateTimeStamp = System.currentTimeMillis()
     // To make sure that update and delete timestamps are not same,
@@ -116,7 +116,7 @@ object HorizontalCompaction {
       factTimeStamp: Long,
       segLists: util.List[String]): Unit = {
     val db = carbonTable.getDatabaseName
-    val table = carbonTable.getFactTableName
+    val table = carbonTable.getTableName
     // get the valid segments qualified for update compaction.
     val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
       absTableIdentifier,
@@ -133,7 +133,7 @@ object HorizontalCompaction {
     try {
       // Update Compaction.
       val alterTableModel = AlterTableModel(Option(carbonTable.getDatabaseName),
-        carbonTable.getFactTableName,
+        carbonTable.getTableName,
         Some(segmentUpdateStatusManager),
         CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
         Some(factTimeStamp),
@@ -167,7 +167,7 @@ object HorizontalCompaction {
       segLists: util.List[String]): Unit = {
 
     val db = carbonTable.getDatabaseName
-    val table = carbonTable.getFactTableName
+    val table = carbonTable.getTableName
     val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
       absTableIdentifier,
       segmentUpdateStatusManager,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
index b18ab78..5817d88 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
@@ -60,7 +60,7 @@ object IUDCommonUtil {
           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
             .getDatabaseName + "." +
           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
-            .getFactTableName
+            .getTableName
         val sementProperty = carbonProperties
           .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbAndTb, "")
         if (!(sementProperty.equals("") || sementProperty.trim.equals("*"))) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
index a898822..cf5bfd8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
@@ -51,7 +51,7 @@ private[sql] case class ProjectForDeleteCommand(
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
       asInstanceOf[CarbonRelation]
-    val carbonTable = relation.tableMeta.carbonTable
+    val carbonTable = relation.carbonTable
 
     // trigger event for Delete from table
     val operationContext = new OperationContext
@@ -77,9 +77,8 @@ private[sql] case class ProjectForDeleteCommand(
       // handle the clean up of IUD.
       CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
-      if (DeleteExecution
-        .deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp, relation,
-          isUpdateOperation = false, executorErrors)) {
+      if (DeleteExecution.deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp,
+        isUpdateOperation = false, executorErrors)) {
         // call IUD Compaction.
         HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation,
           isUpdateOperation = false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
index 549c58f..da62f27 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -30,7 +30,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
 import org.apache.carbondata.processing.loading.FailureCauses
 
@@ -58,7 +57,7 @@ private[sql] case class ProjectForUpdateCommand(
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(DeleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
       asInstanceOf[CarbonRelation]
-    val carbonTable = relation.tableMeta.carbonTable
+    val carbonTable = relation.carbonTable
 
     // trigger event for Update table
     val operationContext = new OperationContext
@@ -74,7 +73,7 @@ private[sql] case class ProjectForUpdateCommand(
     val currentTime = CarbonUpdateUtil.readCurrentTime
     //    var dataFrame: DataFrame = null
     var dataSet: DataFrame = null
-    val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset()
+    val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset
     try {
       lockStatus = metadataLock.lockWithRetries()
       if (lockStatus) {
@@ -83,7 +82,6 @@ private[sql] case class ProjectForUpdateCommand(
       else {
         throw new Exception("Table is locked for updation. Please try after some time")
       }
-      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       // Get RDD.
 
       dataSet = if (isPersistEnabled) {
@@ -93,7 +91,7 @@ private[sql] case class ProjectForUpdateCommand(
       else {
         Dataset.ofRows(sparkSession, plan)
       }
-      var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
 
 
       // handle the clean up of IUD.
@@ -101,8 +99,7 @@ private[sql] case class ProjectForUpdateCommand(
 
       // do delete operation.
       DeleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd,
-        currentTime + "",
-        relation, isUpdateOperation = true, executionErrors)
+        currentTime + "", isUpdateOperation = true, executionErrors)
 
       if(executionErrors.failureCauses != FailureCauses.NONE) {
         throw new Exception(executionErrors.errorMsg)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
index acd9bd3..5a0e4cc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
@@ -65,8 +65,7 @@ case class AlterTableDropCarbonPartitionCommand(
     val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
       .asInstanceOf[CarbonRelation]
-    val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
-    val tablePath = relation.tableMeta.tablePath
+    val tablePath = relation.carbonTable.getTablePath
     carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
     if (relation == null) {
       sys.error(s"Table $dbName.$tableName does not exist")
@@ -75,7 +74,7 @@ case class AlterTableDropCarbonPartitionCommand(
       LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
       sys.error(s"Alter table failed. table not found: $dbName.$tableName")
     }
-    val table = relation.tableMeta.carbonTable
+    val table = relation.carbonTable
     val partitionInfo = table.getPartitionInfo(tableName)
     if (partitionInfo == null) {
       sys.error(s"Table $tableName is not a partition table.")
@@ -101,7 +100,7 @@ case class AlterTableDropCarbonPartitionCommand(
         sys.error(s"Dropping range interval partition isn't support yet!")
     }
     partitionInfo.dropPartition(partitionIndex)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     // read TableInfo
     val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -142,17 +141,13 @@ case class AlterTableDropCarbonPartitionCommand(
       locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
         locksToBeAcquired)(sparkSession)
       val carbonLoadModel = new CarbonLoadModel()
-      val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
-        .asInstanceOf[CarbonRelation]
-      val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
-      val table = relation.tableMeta.carbonTable
+      val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       val dataLoadSchema = new CarbonDataLoadSchema(table)
       // Need to fill dimension relation
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-      carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
-      carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
-      carbonLoadModel.setTablePath(relation.tableMeta.tablePath)
+      carbonLoadModel.setTableName(table.getTableName)
+      carbonLoadModel.setDatabaseName(table.getDatabaseName)
+      carbonLoadModel.setTablePath(table.getTablePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime
       carbonLoadModel.setFactTimeStamp(loadStartTime)
       alterTableDropPartition(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
index 0973226..841da67 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
@@ -69,8 +69,7 @@ case class AlterTableSplitCarbonPartitionCommand(
     val tableName = splitPartitionModel.tableName
     val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
       .asInstanceOf[CarbonRelation]
-    val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
-    val tablePath = relation.tableMeta.tablePath
+    val tablePath = relation.carbonTable.getTablePath
     if (relation == null) {
       sys.error(s"Table $dbName.$tableName does not exist")
     }
@@ -79,7 +78,7 @@ case class AlterTableSplitCarbonPartitionCommand(
       LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
       sys.error(s"Alter table failed. table not found: $dbName.$tableName")
     }
-    val table = relation.tableMeta.carbonTable
+    val table = relation.carbonTable
     val partitionInfo = table.getPartitionInfo(tableName)
     val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
     // keep a copy of partitionIdList before update partitionInfo.
@@ -95,7 +94,7 @@ case class AlterTableSplitCarbonPartitionCommand(
 
     updatePartitionInfo(partitionInfo, partitionIds)
 
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     // read TableInfo
     val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -150,16 +149,12 @@ case class AlterTableSplitCarbonPartitionCommand(
       locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
         locksToBeAcquired)(sparkSession)
       val carbonLoadModel = new CarbonLoadModel()
-      val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
-        .asInstanceOf[CarbonRelation]
-      val tablePath = relation.tableMeta.tablePath
-      val table = relation.tableMeta.carbonTable
-      val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
+      val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      val tablePath = table.getTablePath
       val dataLoadSchema = new CarbonDataLoadSchema(table)
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-      carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
-      carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
+      carbonLoadModel.setTableName(table.getTableName)
+      carbonLoadModel.setDatabaseName(table.getDatabaseName)
       carbonLoadModel.setTablePath(tablePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime
       carbonLoadModel.setFactTimeStamp(loadStartTime)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
index 224304a..903e93b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
@@ -41,10 +41,9 @@ private[sql] case class ShowCarbonPartitionsCommand(
 
   override def processSchema(sparkSession: SparkSession): Seq[Row] = {
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(tableIdentifier)(sparkSession).
-      asInstanceOf[CarbonRelation]
-    val carbonTable = relation.tableMeta.carbonTable
-    val tableName = carbonTable.getFactTableName
+      .lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+    val carbonTable = relation.carbonTable
+    val tableName = carbonTable.getTableName
     val partitionInfo = carbonTable.getPartitionInfo(
       carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
     if (partitionInfo == null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index dd002f0..3854f76 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -73,7 +73,7 @@ case class CreatePreAggregateTableCommand(
     // getting the parent table
     val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
     // getting the table name
-    val parentTableName = parentTable.getFactTableName
+    val parentTableName = parentTable.getTableName
     // getting the db name of parent table
     val parentDbName = parentTable.getDatabaseName
 
@@ -85,9 +85,8 @@ case class CreatePreAggregateTableCommand(
     tableModel.dataMapRelation = Some(fieldRelationMap)
     CarbonCreateTableCommand(tableModel).run(sparkSession)
     try {
-      val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation( tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
-      val tableInfo = relation.tableMeta.carbonTable.getTableInfo
+      val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
+      val tableInfo = table.getTableInfo
       // child schema object which will be updated on parent table about the
       val childSchema = tableInfo.getFactTable
         .buildChildSchema(dataMapName, CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 506a405..f64deec 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -84,7 +84,7 @@ object PreAggregateDataTypeChangePreListener extends OperationEventListener {
     if (carbonTable.isChildDataMap) {
       throw new UnsupportedOperationException(
         s"Cannot change data type for columns in pre-aggregate table ${ carbonTable.getDatabaseName
-        }.${ carbonTable.getFactTableName }")
+        }.${ carbonTable.getTableName }")
     }
   }
 }
@@ -102,7 +102,7 @@ object PreAggregateAddColumnsPreListener extends OperationEventListener {
     if (carbonTable.isChildDataMap) {
       throw new UnsupportedOperationException(
         s"Cannot add columns in pre-aggreagate table ${ carbonTable.getDatabaseName
-        }.${ carbonTable.getFactTableName }")
+        }.${ carbonTable.getTableName }")
     }
   }
 }
@@ -185,7 +185,7 @@ object PreAggregateDropColumnPreListener extends OperationEventListener {
     }
     if (carbonTable.isChildDataMap) {
       throw new UnsupportedOperationException(s"Cannot drop columns in pre-aggreagate table ${
-        carbonTable.getDatabaseName}.${ carbonTable.getFactTableName }")
+        carbonTable.getDatabaseName}.${ carbonTable.getTableName }")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 3193310..1647f9e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -326,21 +326,19 @@ object PreAggregateUtil {
     var numberOfCurrentChild: Int = 0
     try {
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
+      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
       // get the latest carbon table and check for column existence
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(
+        carbonTable.getAbsoluteTableIdentifier)
       val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
-      val wrapperTableInfo = schemaConverter
-        .fromExternalToWrapperTableInfo(thriftTableInfo,
-          dbName,
-          tableName,
-          carbonTable.getTablePath)
+      val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+        thriftTableInfo,
+        dbName,
+        tableName,
+        carbonTable.getTablePath)
       numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
       if (wrapperTableInfo.getDataMapSchemaList.asScala.
         exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) {
@@ -374,7 +372,7 @@ object PreAggregateUtil {
   def updateSchemaInfo(carbonTable: CarbonTable,
       thriftTable: TableInfo)(sparkSession: SparkSession): Unit = {
     val dbName = carbonTable.getDatabaseName
-    val tableName = carbonTable.getFactTableName
+    val tableName = carbonTable.getTableName
     CarbonEnv.getInstance(sparkSession).carbonMetastore
       .updateTableSchemaForDataMap(carbonTable.getCarbonTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
@@ -435,31 +433,30 @@ object PreAggregateUtil {
   def revertMainTableChanges(dbName: String, tableName: String, numberOfChildSchema: Int)
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val carbonTable = metastore
-      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
-      .carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
     carbonTable.getTableLastUpdatedTime
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
     if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
-      metastore
-        .revertTableSchemaForPreAggCreationFailure(carbonTable.getAbsoluteTableIdentifier,
-          thriftTable)(sparkSession)
+      metastore.revertTableSchemaForPreAggCreationFailure(
+        carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession)
     }
   }
 
   def getChildCarbonTable(databaseName: String, tableName: String)
     (sparkSession: SparkSession): Option[CarbonTable] = {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    metaStore.getTableFromMetadataCache(databaseName, tableName) match {
-      case Some(tableMeta) => Some(tableMeta.carbonTable)
-      case None => try {
+    val carbonTable = metaStore.getTableFromMetadataCache(databaseName, tableName)
+    if (carbonTable.isEmpty) {
+      try {
         Some(metaStore.lookupRelation(Some(databaseName), tableName)(sparkSession)
           .asInstanceOf[CarbonRelation].metaData.carbonTable)
       } catch {
         case _: Exception =>
           None
       }
+    } else {
+      carbonTable
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 2132131..3b39334 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -57,9 +57,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       // older carbon table and this can lead to inconsistent state in the system. Therefor look
       // up relation should be called after acquiring the lock
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
+      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       val alterTableAddColumnListener = AlterTableAddColumnPreEvent(carbonTable,
         alterTableAddColumnsModel)
       OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index e44899e..c24a8e9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -51,9 +51,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
+      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       val alterTableDataTypeChangeListener = AlterTableDataTypeChangePreEvent(carbonTable,
         alterTableDataTypeChangeModel)
       OperationListenerBus.getInstance().fireEvent(alterTableDataTypeChangeListener)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index dae2d7b..721dd0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -53,9 +53,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
+      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       val partitionInfo = carbonTable.getPartitionInfo(tableName)
       if (partitionInfo != null) {
         val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index f1cce13..e7beedd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -81,9 +81,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
       locks = AlterTableUtil
         .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
           sparkSession)
-      val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
-        .asInstanceOf[CarbonRelation].tableMeta
-      carbonTable = tableMeta.carbonTable
+      carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+        .asInstanceOf[CarbonRelation].carbonTable
       // invalid data map for the old table, see CARBON-1690
       val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
@@ -134,7 +133,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
         carbonTable.getCarbonTableIdentifier,
         tableInfo,
         schemaEvolutionEntry,
-        tableMeta.tablePath)(sparkSession)
+        carbonTable.getTablePath)(sparkSession)
 
       val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
         carbonTable,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index c126b25..a060833 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -357,11 +357,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
 
   private def getPartitioning(carbonTable: CarbonTable,
       output: Seq[Attribute]): Partitioning = {
-    val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
+    val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getTableName)
     if (info != null) {
       val cols = info.getListOfColumns.asScala
       val sortColumn = carbonTable.
-        getDimensionByTableName(carbonTable.getFactTableName).get(0).getColName
+        getDimensionByTableName(carbonTable.getTableName).get(0).getColName
       val numBuckets = info.getNumberOfBuckets
       val bucketColumns = cols.flatMap { n =>
         val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index ef2e0a5..d6450c1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -51,8 +51,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         val dbOption = oldTableIdentifier.database.map(_.toLowerCase)
         val tableIdentifier = TableIdentifier(oldTableIdentifier.table.toLowerCase(), dbOption)
         val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .tableExists(tableIdentifier)(
-            sparkSession)
+          .tableExists(tableIdentifier)(sparkSession)
         if (isCarbonTable) {
           val renameModel = AlterTableRenameModel(tableIdentifier, newTableIdentifier)
           ExecutedCommandExec(CarbonAlterTableRenameCommand(renameModel)) :: Nil
@@ -155,13 +154,13 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
         ExecutedCommandExec(cmd) :: Nil
       case AlterTableSetPropertiesCommand(tableName, properties, isView)
-        if (CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .tableExists(tableName)(sparkSession)) => {
+        if CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(tableName)(sparkSession) => {
         ExecutedCommandExec(AlterTableSetCommand(tableName, properties, isView)) :: Nil
       }
       case AlterTableUnsetPropertiesCommand(tableName, propKeys, ifExists, isView)
-        if (CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .tableExists(tableName)(sparkSession)) => {
+        if CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(tableName)(sparkSession) => {
         ExecutedCommandExec(AlterTableUnsetCommand(tableName, propKeys, ifExists, isView)) :: Nil
       }
       case _ => Nil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 9ebf47e..49a57e6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.execution.command.{AlterTableRenameCommand, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.AlterTableRenameCommand
 import org.apache.spark.sql.execution.command.mutation.{DeleteExecution, ProjectForDeleteCommand, ProjectForUpdateCommand}
 import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
 import org.apache.spark.sql.hive.CarbonRelation
@@ -76,7 +76,6 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp
     val streaming = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(tableIdentifier)(sparkSession)
       .asInstanceOf[CarbonRelation]
-      .tableMeta
       .carbonTable
       .isStreamingTable
     if (streaming) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 6d80a26..87c919d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicLong
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession}
@@ -44,13 +43,12 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
-case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) {
+case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
   // clear the metadata
   def clear(): Unit = {
-    tablesMeta.clear()
+    carbonTables.clear()
   }
 }
 
@@ -80,7 +78,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     System.nanoTime() + ""
   }
 
-  val metadata = MetaData(new ArrayBuffer[TableMeta]())
+  val metadata = MetaData(new ArrayBuffer[CarbonTable]())
 
 
   /**
@@ -98,13 +96,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val tables = getTableFromMetadataCache(database, tableName)
     tables match {
       case Some(t) =>
-        CarbonRelation(database, tableName,
-          CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
+        CarbonRelation(database, tableName, CarbonSparkUtil.createSparkMeta(t), t)
       case None =>
         readCarbonSchema(absIdentifier) match {
           case Some(meta) =>
             CarbonRelation(database, tableName,
-              CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
+              CarbonSparkUtil.createSparkMeta(meta), meta)
           case None =>
             throw new NoSuchTableException(database, tableName)
         }
@@ -151,7 +148,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val operationContext = new OperationContext
     val lookupRelationPostEvent: LookupRelationPostEvent =
       LookupRelationPostEvent(
-        relation.tableMeta.carbonTable,
+        relation.carbonTable,
         sparkSession)
     OperationListenerBus.getInstance.fireEvent(lookupRelationPostEvent, operationContext)
     relation
@@ -164,10 +161,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
    * @param tableName
    * @return
    */
-  def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = {
-    metadata.tablesMeta
-      .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-        c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+  def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] = {
+    metadata.carbonTables
+      .find(table => table.getDatabaseName.equalsIgnoreCase(database) &&
+        table.getTableName.equalsIgnoreCase(tableName))
   }
 
   def tableExists(
@@ -187,7 +184,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     true
   }
 
-  private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
+  private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[CarbonTable] = {
     val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = identifier.getCarbonTableIdentifier.getTableName
     val tablePath = identifier.getTablePath
@@ -210,12 +207,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
         .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
       val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-      val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
-        identifier.getTablePath,
-        identifier.getTablePath,
-        carbonTable)
-      metadata.tablesMeta += tableMeta
-      Some(tableMeta)
+      metadata.carbonTables += carbonTable
+      Some(carbonTable)
     } else {
       None
     }
@@ -378,10 +371,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
     CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
     removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getTablePath,
-      absoluteTableIdentifier.getTablePath,
-      CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
-    metadata.tablesMeta += tableMeta
+    metadata.carbonTables +=
+      CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)
   }
 
   /**
@@ -391,10 +382,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
    * @param tableName
    */
   def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
-    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
-    metadataToBeRemoved match {
-      case Some(tableMeta) =>
-        metadata.tablesMeta -= tableMeta
+    val carbonTableToBeRemoved: Option[CarbonTable] = getTableFromMetadataCache(dbName, tableName)
+    carbonTableToBeRemoved match {
+      case Some(carbonTable) =>
+        metadata.carbonTables -= carbonTable
         CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
       case None =>
         if (LOGGER.isDebugEnabled) {
@@ -409,10 +400,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
     CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
       wrapperTableInfo.getTableUniqueName)
-    for (i <- metadata.tablesMeta.indices) {
+    for (i <- metadata.carbonTables.indices) {
       if (wrapperTableInfo.getTableUniqueName.equals(
-        metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
-        metadata.tablesMeta(i).carbonTable = carbonTable
+        metadata.carbonTables(i).getTableUniqueName)) {
+        metadata.carbonTables(i) = carbonTable
       }
     }
   }
@@ -434,8 +425,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
 
   def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
     try {
-      val tablePath = lookupRelation(tableIdentifier)(sparkSession).
-        asInstanceOf[CarbonRelation].tableMeta.tablePath
+      val tablePath = lookupRelation(tableIdentifier)(sparkSession)
+        .asInstanceOf[CarbonRelation].carbonTable.getTablePath
       val fileType = FileFactory.getFileType(tablePath)
       FileFactory.isFileExist(tablePath, fileType)
     } catch {
@@ -531,13 +522,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
   }
 
   private def refreshCache() {
-    metadata.tablesMeta.clear()
+    metadata.carbonTables.clear()
   }
 
   override def isReadFromHiveMetaStore: Boolean = false
 
   override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
-    metadata.tablesMeta.map(_.carbonTable)
+    metadata.carbonTables
 
   override def getThriftTableInfo(tablePath: CarbonTablePath)
     (sparkSession: SparkSession): TableInfo = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index dedaf1c..4d4229a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -21,20 +21,17 @@ import scala.collection.JavaConverters._
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.format
 import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.processing.merger.TableMeta
-import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil}
+import org.apache.carbondata.spark.util.CarbonSparkUtil
 
 /**
  * Metastore to store carbonschema in hive
@@ -56,10 +53,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     val info = CarbonUtil.convertGsonToTableInfo(parameters.asJava)
     if (info != null) {
       val table = CarbonTable.buildFromTableInfo(info)
-      val meta = new TableMeta(table.getCarbonTableIdentifier,
-        absIdentifier.getTablePath, absIdentifier.getTablePath, table)
       CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName,
-        CarbonSparkUtil.createSparkMeta(table), meta)
+        CarbonSparkUtil.createSparkMeta(table), table)
     } else {
       super.createCarbonRelation(parameters, absIdentifier, sparkSession)
     }
@@ -107,7 +102,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo,
       carbonTable.getDatabaseName,
-      carbonTable.getFactTableName)
+      carbonTable.getTableName)
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 357a812..696342f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.processing.merger.TableMeta
+
 
 /**
  * Interface for Carbonmetastore
@@ -140,7 +140,7 @@ trait CarbonMetaStore {
 
   def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo
 
-  def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta]
+  def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
 
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 3fb0db0..c48e6e8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -761,7 +761,7 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
           .DEFAULT_MAX_NUMBER_OF_COLUMNS
         )
     }
-    val isAggregateTable = !relation.carbonRelation.tableMeta.carbonTable.getTableInfo
+    val isAggregateTable = !relation.carbonRelation.carbonTable.getTableInfo
       .getParentRelationIdentifiers.isEmpty
     // transform logical plan if the load is for aggregate table.
     val childPlan = if (isAggregateTable) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 2c476ed..9187fe2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -29,10 +29,10 @@ import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.merger.TableMeta
 
 /**
  * Represents logical plan for one carbon table
@@ -41,7 +41,7 @@ case class CarbonRelation(
     databaseName: String,
     tableName: String,
     var metaData: CarbonMetaData,
-    tableMeta: TableMeta)
+    carbonTable: CarbonTable)
   extends LeafNode with MultiInstanceRelation {
 
   def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
@@ -84,17 +84,17 @@ case class CarbonRelation(
   }
 
   override def newInstance(): LogicalPlan = {
-    CarbonRelation(databaseName, tableName, metaData, tableMeta)
+    CarbonRelation(databaseName, tableName, metaData, carbonTable)
       .asInstanceOf[this.type]
   }
 
-  val dimensionsAttr = {
+  val dimensionsAttr: Seq[AttributeReference] = {
     val sett = new LinkedHashSet(
-      tableMeta.carbonTable.getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName)
+      carbonTable.getDimensionByTableName(carbonTable.getTableName)
         .asScala.asJava)
     sett.asScala.toSeq.map(dim => {
       val dimval = metaData.carbonTable
-        .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
+        .getDimensionByName(metaData.carbonTable.getTableName, dim.getColName)
       val output: DataType = dimval.getDataType.getName.toLowerCase match {
         case "array" =>
           CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
@@ -113,11 +113,10 @@ case class CarbonRelation(
   }
 
   val measureAttr = {
-    val factTable = tableMeta.carbonTable.getFactTableName
+    val factTable = carbonTable.getTableName
     new LinkedHashSet(
-      tableMeta.carbonTable.
-        getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
-        asScala.asJava).asScala.toSeq.map { x =>
+      carbonTable.getMeasureByTableName(carbonTable.getTableName).asScala.asJava).asScala.toSeq
+      .map { x =>
       val metastoreType = metaData.carbonTable.getMeasureByName(factTable, x.getColName)
         .getDataType.getName.toLowerCase match {
         case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
@@ -131,7 +130,7 @@ case class CarbonRelation(
   }
 
   override val output = {
-    val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName)
+    val columns = carbonTable.getCreateOrderColumn(carbonTable.getTableName)
       .asScala
     // convert each column to Attribute
     columns.filter(!_.isInvisible).map { column =>
@@ -196,12 +195,11 @@ case class CarbonRelation(
 
   def sizeInBytes: Long = {
     val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
-      tableMeta.carbonTable.getAbsoluteTableIdentifier)
+      carbonTable.getAbsoluteTableIdentifier)
 
     if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
       val tablePath = CarbonStorePath.getCarbonTablePath(
-        tableMeta.storePath,
-        tableMeta.carbonTableIdentifier).getPath
+        carbonTable.getAbsoluteTableIdentifier).getPath
       val fileType = FileFactory.getFileType(tablePath)
       if(FileFactory.isFileExist(tablePath, fileType)) {
         tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index e587395..b0aecd7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events._
 
 /**
@@ -106,15 +107,15 @@ class CarbonSessionCatalog(
       alias: Option[String],
       carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
     var isRefreshed = false
-    val storePath = CarbonEnv.getInstance(sparkSession).storePath
+    val storePath = CarbonProperties.getStorePath
     carbonEnv.carbonMetastore.
       checkSchemasModifiedTimeAndReloadTables()
 
-    val tableMeta = carbonEnv.carbonMetastore
-      .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
-        carbonDatasourceHadoopRelation.carbonTable.getFactTableName)
-    if (tableMeta.isEmpty || (tableMeta.isDefined &&
-        tableMeta.get.carbonTable.getTableLastUpdatedTime !=
+    val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
+      carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
+      carbonDatasourceHadoopRelation.carbonTable.getTableName)
+    if (table.isEmpty || (table.isDefined &&
+        table.get.getTableLastUpdatedTime !=
           carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
       refreshTable(identifier)
       DataMapStoreManager.getInstance().

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index b76b24e..9cc5d86 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -42,11 +42,11 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
     var databaseLocation = ""
     try {
       databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-        CarbonEnv.getInstance(sparkSession).storePath)
+        CarbonProperties.getStorePath)
     } catch {
       case e: NoSuchDatabaseException =>
         // ignore the exception as exception will be handled by hive command.run
-      databaseLocation = CarbonEnv.getInstance(sparkSession).storePath
+      databaseLocation = CarbonProperties.getStorePath
     }
     // DropHiveDB command will fail if cascade is false and one or more table exists in database
     if (command.cascade && tablesInDB != null) {