You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/17 06:48:08 UTC

[1/3] carbondata git commit: [CARBONDATA-1739] Clean up store path interface

Repository: carbondata
Updated Branches:
  refs/heads/master b6777fcc3 -> 5fc7f06f2


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 153b169..07491d1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -65,7 +65,7 @@ object AlterTableUtil {
       sys.error(s"Table $dbName.$tableName does not exist")
     }
     // acquire the lock first
-    val table = relation.tableMeta.carbonTable
+    val table = relation.carbonTable
     val acquiredLocks = ListBuffer[ICarbonLock]()
     try {
       locksToBeAcquired.foreach { lock =>
@@ -133,7 +133,7 @@ object AlterTableUtil {
       thriftTable: TableInfo)(sparkSession: SparkSession,
       sessionState: CarbonSessionState): Unit = {
     val dbName = carbonTable.getDatabaseName
-    val tableName = carbonTable.getFactTableName
+    val tableName = carbonTable.getTableName
     CarbonEnv.getInstance(sparkSession).carbonMetastore
       .updateTableSchemaForAlter(carbonTable.getCarbonTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
@@ -232,10 +232,7 @@ object AlterTableUtil {
   def revertAddColumnChanges(dbName: String, tableName: String, timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val carbonTable = metastore
-      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
-      .carbonTable
-
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
       carbonTable.getCarbonTableIdentifier)
     val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -262,9 +259,7 @@ object AlterTableUtil {
   def revertDropColumnChanges(dbName: String, tableName: String, timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val carbonTable = metastore
-      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
-      .carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
       carbonTable.getCarbonTableIdentifier)
     val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -297,9 +292,7 @@ object AlterTableUtil {
   def revertDataTypeChanges(dbName: String, tableName: String, timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val carbonTable = metastore
-      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
-      .carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
       carbonTable.getCarbonTableIdentifier)
     val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -343,30 +336,27 @@ object AlterTableUtil {
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
     var locks = List.empty[ICarbonLock]
     var timeStamp = 0L
-    var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
     var carbonTable: CarbonTable = null
     try {
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
+      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       // get the latest carbon table
       // read the latest schema file
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
         carbonTable.getCarbonTableIdentifier)
       val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
-      val wrapperTableInfo = schemaConverter
-        .fromExternalToWrapperTableInfo(thriftTableInfo,
-          dbName,
-          tableName,
-          carbonTable.getTablePath)
+      val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+        thriftTableInfo,
+        dbName,
+        tableName,
+        carbonTable.getTablePath)
       val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
       schemaEvolutionEntry.setTimeStamp(timeStamp)
-      val thriftTable = schemaConverter
-        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+      val thriftTable = schemaConverter.fromWrapperToExternalTableInfo(
+        wrapperTableInfo, dbName, tableName)
       val tblPropertiesMap: mutable.Map[String, String] =
         thriftTable.fact_table.getTableProperties.asScala
       if (set) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index dcfbaea..c05c0f1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.api.CarbonStore
 
@@ -40,9 +39,7 @@ object CleanFiles {
   def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
       storePath: String, forceTableClean: Boolean = false): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
-      lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
     CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 8375762..d682b21 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -17,7 +17,6 @@
  package org.apache.spark.util
 
  import org.apache.spark.sql.{CarbonEnv, SparkSession}
- import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.api.CarbonStore
 
@@ -30,9 +29,7 @@ object DeleteSegmentByDate {
   def deleteSegmentByDate(spark: SparkSession, dbName: String, tableName: String,
       dateValue: String): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
-      lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
     CarbonStore.deleteLoadByDate(dateValue, dbName, tableName, carbonTable)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index 9b87504..5b58c8d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -34,9 +34,7 @@ object DeleteSegmentById {
   def deleteSegmentById(spark: SparkSession, dbName: String, tableName: String,
       segmentIds: Seq[String]): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
-      lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
     CarbonStore.deleteLoadById(segmentIds, dbName, tableName, carbonTable)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 23cba20..287191c 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -245,7 +245,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   test("Alter table add partition: List Partition") {
     sql("""ALTER TABLE list_table_area ADD PARTITION ('OutSpace', 'Hi')""".stripMargin)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val list_info = partitionInfo.getListInfo
     assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -286,7 +286,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     
     sql("""ALTER TABLE list_table_area DROP PARTITION(2) WITH DATA""")
     val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
-    val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getTableName)
     val partitionIds2 = partitionInfo2.getPartitionIds
     val list_info2 = partitionInfo2.getListInfo
     assert(partitionIds2 == List(0, 1, 3, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -304,7 +304,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   test("Alter table add partition: Range Partition") {
     sql("""ALTER TABLE range_table_logdate ADD PARTITION ('2017/01/01', '2018/01/01')""")
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val range_info = partitionInfo.getRangeInfo
     assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -342,7 +342,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
     sql("""ALTER TABLE range_table_logdate DROP PARTITION(3) WITH DATA;""")
     val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
-    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val range_info1 = partitionInfo1.getRangeInfo
     assert(partitionIds1 == List(0, 1, 2, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -373,7 +373,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   test("Alter table split partition: List Partition") {
     sql("""ALTER TABLE list_table_country SPLIT PARTITION(4) INTO ('Canada', 'Russia', '(Good, NotGood)')""".stripMargin)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val list_info = partitionInfo.getListInfo
     assert(partitionIds == List(0, 1, 2, 3, 6, 7, 8, 5).map(Integer.valueOf(_)).asJava)
@@ -415,7 +415,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
     sql("""ALTER TABLE list_table_country DROP PARTITION(8)""")
     val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
-    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val list_info1 = partitionInfo1.getListInfo
     assert(partitionIds1 == List(0, 1, 2, 3, 6, 7, 5).map(Integer.valueOf(_)).asJava)
@@ -438,7 +438,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     sql("""ALTER TABLE list_table_country ADD PARTITION ('(Part1, Part2, Part3, Part4)')""".stripMargin)
     sql("""ALTER TABLE list_table_country SPLIT PARTITION(9) INTO ('Part4', 'Part2', '(Part1, Part3)')""".stripMargin)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val list_info = partitionInfo.getListInfo
     assert(partitionIds == List(0, 1, 2, 3, 6, 7, 5, 10, 11, 12).map(Integer.valueOf(_)).asJava)
@@ -485,7 +485,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     sql("""ALTER TABLE list_table_area ADD PARTITION ('(One,Two, Three, Four)')""".stripMargin)
     sql("""ALTER TABLE list_table_area SPLIT PARTITION(6) INTO ('One', '(Two, Three )', 'Four')""".stripMargin)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val list_info = partitionInfo.getListInfo
     assert(partitionIds == List(0, 1, 3, 4, 5, 7, 8, 9).map(Integer.valueOf(_)).asJava)
@@ -528,7 +528,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   test("Alter table split partition: Range Partition") {
     sql("""ALTER TABLE range_table_logdate_split SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""")
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val rangeInfo = partitionInfo.getRangeInfo
     assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava)
@@ -566,7 +566,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
     sql("""ALTER TABLE range_table_logdate_split DROP PARTITION(6)""")
     val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
-    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val rangeInfo1 = partitionInfo1.getRangeInfo
     assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
@@ -586,7 +586,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   test("Alter table split partition: Range Partition + Bucket") {
     sql("""ALTER TABLE range_table_bucket SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""")
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val rangeInfo = partitionInfo.getRangeInfo
     assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava)
@@ -624,7 +624,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
     sql("""ALTER TABLE range_table_bucket DROP PARTITION(6) WITH DATA""")
     val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
-    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val rangeInfo1 = partitionInfo1.getRangeInfo
     assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
@@ -642,7 +642,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
     sql("""ALTER TABLE range_table_bucket DROP PARTITION(3)""")
     val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
-    val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getTableName)
     val partitionIds2 = partitionInfo2.getPartitionIds
     val rangeInfo2 = partitionInfo2.getRangeInfo
     assert(partitionIds2 == List(0, 1, 2, 5).map(Integer.valueOf(_)).asJava)
@@ -659,7 +659,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
     sql("""ALTER TABLE range_table_bucket DROP PARTITION(5)""")
     val carbonTable3 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
-    val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getTableName)
     val partitionIds3 = partitionInfo3.getPartitionIds
     val rangeInfo3 = partitionInfo3.getRangeInfo
     assert(partitionIds3 == List(0, 1, 2).map(Integer.valueOf(_)).asJava)
@@ -789,7 +789,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     sql("ALTER TABLE carbon_table_default_db ADD PARTITION ('2017')")
 
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_carbon_table_default_db")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val range_info = partitionInfo.getRangeInfo
     assert(partitionIds == List(0, 1, 2, 3).map(Integer.valueOf(_)).asJava)
@@ -809,7 +809,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     sql("ALTER TABLE carbondb.carbontable ADD PARTITION ('2017')")
 
     val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("carbondb_carbontable")
-    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable1.getFactTableName)
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable1.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val range_info1 = partitionInfo1.getRangeInfo
     assert(partitionIds1 == List(0, 1, 2, 3).map(Integer.valueOf(_)).asJava)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index a3024be..3f5d8c6 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -43,12 +43,12 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     header: String,
     allDictFilePath: String): CarbonLoadModel = {
     val carbonLoadModel = new CarbonLoadModel
-    carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-    carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getTableName)
-    val table = relation.tableMeta.carbonTable
+    carbonLoadModel.setTableName(relation.carbonTable.getDatabaseName)
+    carbonLoadModel.setDatabaseName(relation.carbonTable.getTableName)
+    val table = relation.carbonTable
     val carbonSchema = new CarbonDataLoadSchema(table)
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
-    carbonLoadModel.setTableName(table.getFactTableName)
+    carbonLoadModel.setTableName(table.getTableName)
     carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
     carbonLoadModel.setFactFilePath(filePath)
     carbonLoadModel.setCsvHeader(header)
@@ -141,10 +141,8 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
   test("Support generate global dictionary from all dictionary files") {
     val header = "id,name,city,age"
     val carbonLoadModel = buildCarbonLoadModel(sampleRelation, null, header, sampleAllDictionaryFile)
-    GlobalDictionaryUtil
-      .generateGlobalDictionary(sqlContext,
-        carbonLoadModel,
-        sampleRelation.tableMeta.storePath)
+    GlobalDictionaryUtil.generateGlobalDictionary(
+      sqlContext, carbonLoadModel, sampleRelation.carbonTable.getTablePath)
 
     DictionaryTestCaseUtil.
       checkDictionary(sampleRelation, "city", "shenzhen")
@@ -156,7 +154,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     GlobalDictionaryUtil
       .generateGlobalDictionary(sqlContext,
       carbonLoadModel,
-      complexRelation.tableMeta.storePath)
+      complexRelation.carbonTable.getTablePath)
 
     DictionaryTestCaseUtil.
       checkDictionary(complexRelation, "channelsId", "1650")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index 930de43..4551120 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -38,9 +38,9 @@ object DictionaryTestCaseUtil {
    * @param value  a value of column
    */
   def checkDictionary(relation: CarbonRelation, columnName: String, value: String) {
-    val table = relation.tableMeta.carbonTable
-    val dimension = table.getDimensionByName(table.getFactTableName, columnName)
-    val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getFactTableName, "uniqueid")
+    val table = relation.carbonTable
+    val dimension = table.getDimensionByName(table.getTableName, columnName)
+    val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getTableName, "uniqueid")
     val  absoluteTableIdentifier = new AbsoluteTableIdentifier(table.getTablePath, tableIdentifier)
     val columnIdentifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
       dimension.getColumnIdentifier, dimension.getDataType,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index d37a68b..78ae384 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -151,12 +151,12 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
       extColFilePath: String,
       csvDelimiter: String = ","): CarbonLoadModel = {
     val carbonLoadModel = new CarbonLoadModel
-    carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-    carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getTableName)
-    val table = relation.tableMeta.carbonTable
+    carbonLoadModel.setTableName(relation.carbonTable.getDatabaseName)
+    carbonLoadModel.setDatabaseName(relation.carbonTable.getTableName)
+    val table = relation.carbonTable
     val carbonSchema = new CarbonDataLoadSchema(table)
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
-    carbonLoadModel.setTableName(table.getFactTableName)
+    carbonLoadModel.setTableName(table.getTableName)
     carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
     carbonLoadModel.setFactFilePath(filePath)
     carbonLoadModel.setCsvHeader(header)
@@ -198,7 +198,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
       header, extColDictFilePath1)
     GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
-      extComplexRelation.tableMeta.storePath)
+      extComplexRelation.carbonTable.getTablePath)
     // check whether the dictionary is generated
     DictionaryTestCaseUtil.checkDictionary(
       extComplexRelation, "deviceInformationId", "10086")
@@ -207,7 +207,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
       header, extColDictFilePath2)
     GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
-      extComplexRelation.tableMeta.storePath)
+      extComplexRelation.carbonTable.getTablePath)
     // check the old dictionary and whether the new distinct value is generated
     DictionaryTestCaseUtil.checkDictionary(
       extComplexRelation, "deviceInformationId", "10086")
@@ -220,7 +220,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
       header, extColDictFilePath3)
     GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
-      extComplexRelation.tableMeta.storePath)
+      extComplexRelation.carbonTable.getTablePath)
     // check whether the dictionary is generated
     DictionaryTestCaseUtil.checkDictionary(
       extComplexRelation, "channelsId", "1421|")
@@ -229,7 +229,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     carbonLoadModel = buildCarbonLoadModel(verticalDelimiteRelation, complexFilePath2,
       header2, extColDictFilePath3, "|")
     GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
-      verticalDelimiteRelation.tableMeta.storePath)
+      verticalDelimiteRelation.carbonTable.getTablePath)
     // check whether the dictionary is generated
     DictionaryTestCaseUtil.checkDictionary(
       verticalDelimiteRelation, "channelsId", "1431,")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 442d93e..71c3dc2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -182,9 +182,9 @@ public final class DataLoadProcessBuilder {
         loadModel.getBadRecordsLocation());
     CarbonMetadata.getInstance().addCarbonTable(carbonTable);
     List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     Map<String, String> dateFormatMap =
         CarbonDataProcessorUtil.getDateFormatMap(loadModel.getDateFormat());
     List<DataField> dataFields = new ArrayList<>();
@@ -209,7 +209,7 @@ public final class DataLoadProcessBuilder {
       }
     }
     configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
-    configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName()));
+    configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getTableName()));
     // configuration for one pass load: dictionary server info
     configuration.setUseOnePass(loadModel.getUseOnePass());
     configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index be3572c..65f70a0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -209,7 +209,7 @@ public class CarbonCompactionExecutor {
     List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
     List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     for (CarbonDimension dim : dimensions) {
       // check if dimension is deleted
       QueryDimension queryDimension = new QueryDimension(dim.getColName());
@@ -220,7 +220,7 @@ public class CarbonCompactionExecutor {
 
     List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     for (CarbonMeasure carbonMeasure : measures) {
       // check if measure is deleted
       QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 08b8600..c60bb24 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -308,7 +308,7 @@ public class CarbonCompactionUtil {
   public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> columnCardinalityMap,
       CarbonTable carbonTable, List<ColumnSchema> updatedColumnSchemaList) {
     List<CarbonDimension> masterDimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     List<Integer> updatedCardinalityList = new ArrayList<>(columnCardinalityMap.size());
     for (CarbonDimension dimension : masterDimensions) {
       Integer value = columnCardinalityMap.get(dimension.getColumnId());
@@ -321,7 +321,7 @@ public class CarbonCompactionUtil {
     }
     // add measures to the column schema list
     List<CarbonMeasure> masterSchemaMeasures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     for (CarbonMeasure measure : masterSchemaMeasures) {
       updatedColumnSchemaList.add(measure.getColumnSchema());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index c1df349..8f6d19c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1264,7 +1264,7 @@ public final class CarbonDataMergerUtil {
       lockStatus = carbonLock.lockWithRetries();
       if (lockStatus) {
         LOGGER.info(
-                "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+                "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
                         + " for table status updation");
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
@@ -1284,18 +1284,18 @@ public final class CarbonDataMergerUtil {
         }
       } else {
         LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
-                .getDatabaseName() + "." + table.getFactTableName());
+                .getDatabaseName() + "." + table.getTableName());
       }
     } finally {
       if (lockStatus) {
         if (carbonLock.unlock()) {
           LOGGER.info(
                  "Table unlocked successfully after table status updation" + table.getDatabaseName()
-                          + "." + table.getFactTableName());
+                          + "." + table.getTableName());
         } else {
           LOGGER.error(
                   "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
-                          .getFactTableName() + " during table status updation");
+                          .getTableName() + " during table status updation");
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java b/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
deleted file mode 100644
index 09dbfff..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.merger;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-
-public class TableMeta implements Serializable {
-
-  private static final long serialVersionUID = -1749874611119829431L;
-
-  public CarbonTableIdentifier carbonTableIdentifier;
-  public String storePath;
-  public CarbonTable carbonTable;
-  public String tablePath;
-
-  public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath, String tablePath,
-      CarbonTable carbonTable) {
-    this.carbonTableIdentifier = carbonTableIdentifier;
-    this.storePath = storePath;
-    this.tablePath = tablePath;
-    this.carbonTable = carbonTable;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
index aeddac6..36e022b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
@@ -78,7 +78,7 @@ public abstract class AbstractCarbonQueryExecutor {
     List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
     List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     for (CarbonDimension dim : dimensions) {
       // check if dimension is deleted
       QueryDimension queryDimension = new QueryDimension(dim.getColName());
@@ -89,7 +89,7 @@ public abstract class AbstractCarbonQueryExecutor {
 
     List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     for (CarbonMeasure carbonMeasure : measures) {
       // check if measure is deleted
       QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 1db414f..48c5471 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -46,7 +46,7 @@ public class RowResultProcessor {
       SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
     CarbonDataProcessorUtil.createLocations(tempStoreLocation);
     this.segmentProperties = segProp;
-    String tableName = carbonTable.getFactTableName();
+    String tableName = carbonTable.getTableName();
     CarbonFactDataHandlerModel carbonFactDataHandlerModel =
         CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
             segProp, tableName, tempStoreLocation);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 504e7ec..75fcea3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.processing.store;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -42,19 +41,15 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
 import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthEquiSplitGenerator;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
-import org.apache.carbondata.processing.store.file.FileManager;
-import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
 
@@ -77,10 +72,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private CarbonFactDataWriter dataWriter;
 
   /**
-   * File manager
-   */
-  private IFileManagerComposite fileManager;
-  /**
    * total number of entries in blocklet
    */
   private int entryCount;
@@ -91,11 +82,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private int pageSize;
 
-  // This variable is true if it is dictionary dimension and its cardinality is lower than
-  // property of CarbonCommonConstants.HIGH_CARDINALITY_VALUE
-  // It decides whether it will do RLE encoding on data page for this dimension
-  private boolean[] rleEncodingForDictDimension;
-  private boolean[] isNoDictionary;
   private long processedDataCount;
   private ExecutorService producerExecutorService;
   private List<Future<Void>> producerExecutorServiceTaskList;
@@ -130,12 +116,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private boolean processingComplete;
 
   /**
-   * boolean to check whether dimension
-   * is of dictionary type or no dictionary type
-   */
-  private boolean[] isDictDimension;
-
-  /**
    * current data format version
    */
   private ColumnarFormatVersion version;
@@ -146,47 +126,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel model) {
     this.model = model;
     initParameters(model);
-
-    int numDimColumns = colGrpModel.getNoOfColumnStore() + model.getNoDictionaryCount()
-        + getExpandedComplexColsCount();
-    this.rleEncodingForDictDimension = new boolean[numDimColumns];
-    this.isNoDictionary = new boolean[numDimColumns];
-
-    int noDictStartIndex = this.colGrpModel.getNoOfColumnStore();
-    // setting true value for dims of high card
-    for (int i = 0; i < model.getNoDictionaryCount(); i++) {
-      this.isNoDictionary[noDictStartIndex + i] = true;
-    }
-
-    boolean isAggKeyBlock = Boolean.parseBoolean(
-        CarbonProperties.getInstance().getProperty(
-            CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
-            CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
-    if (isAggKeyBlock) {
-      int[] dimLens = model.getSegmentProperties().getDimColumnsCardinality();
-      for (int i = 0; i < model.getTableSpec().getNumSimpleDimensions(); i++) {
-        if (model.getSegmentProperties().getDimensions().get(i).isGlobalDictionaryEncoding()) {
-          this.rleEncodingForDictDimension[i] = true;
-        }
-      }
-
-      if (model.getDimensionCount() < dimLens.length) {
-        int allColsCount = getColsCount(model.getDimensionCount());
-        List<Boolean> rleWithComplex = new ArrayList<Boolean>(allColsCount);
-        for (int i = 0; i < model.getDimensionCount(); i++) {
-          GenericDataType complexDataType = model.getComplexIndexMap().get(i);
-          if (complexDataType != null) {
-            complexDataType.fillAggKeyBlock(rleWithComplex, this.rleEncodingForDictDimension);
-          } else {
-            rleWithComplex.add(this.rleEncodingForDictDimension[i]);
-          }
-        }
-        this.rleEncodingForDictDimension = new boolean[allColsCount];
-        for (int i = 0; i < allColsCount; i++) {
-          this.rleEncodingForDictDimension[i] = rleWithComplex.get(i);
-        }
-      }
-    }
     this.version = CarbonProperties.getInstance().getFormatVersion();
     StringBuffer noInvertedIdxCol = new StringBuffer();
     for (CarbonDimension cd : model.getSegmentProperties().getDimensions()) {
@@ -202,13 +141,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     SortScopeOptions.SortScope sortScope = model.getSortScope();
     this.colGrpModel = model.getSegmentProperties().getColumnGroupModel();
 
-    //TODO need to pass carbon table identifier to metadata
-    CarbonTable carbonTable =
-        CarbonMetadata.getInstance().getCarbonTable(
-            model.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + model.getTableName());
-    isDictDimension =
-        CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(model.getTableName()));
-
     // in compaction flow the measure with decimal type will come as spark decimal.
     // need to convert it to byte array.
     if (model.isCompactionFlow()) {
@@ -247,19 +179,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     consumerExecutorServiceTaskList.add(consumerExecutorService.submit(consumer));
   }
 
-  private boolean[] arrangeUniqueBlockType(boolean[] aggKeyBlock) {
-    int counter = 0;
-    boolean[] uniqueBlock = new boolean[aggKeyBlock.length];
-    for (int i = 0; i < isDictDimension.length; i++) {
-      if (isDictDimension[i]) {
-        uniqueBlock[i] = aggKeyBlock[counter++];
-      } else {
-        uniqueBlock[i] = false;
-      }
-    }
-    return uniqueBlock;
-  }
-
   private void setComplexMapSurrogateIndex(int dimensionCount) {
     int surrIndex = 0;
     for (int i = 0; i < dimensionCount; i++) {
@@ -283,9 +202,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * @throws CarbonDataWriterException
    */
   public void initialise() throws CarbonDataWriterException {
-    fileManager = new FileManager();
-    // todo: the fileManager seems to be useless, remove it later
-    fileManager.setName(new File(model.getStoreLocation()[0]).getName());
     setWritingConfiguration();
   }
 
@@ -412,27 +328,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
   }
 
-  private int getColsCount(int columnSplit) {
-    int count = 0;
-    for (int i = 0; i < columnSplit; i++) {
-      GenericDataType complexDataType = model.getComplexIndexMap().get(i);
-      if (complexDataType != null) {
-        count += complexDataType.getColsCount();
-      } else count++;
-    }
-    return count;
-  }
-
   // return the number of complex column after complex columns are expanded
   private int getExpandedComplexColsCount() {
     return model.getExpandedComplexColsCount();
   }
 
-  // return the number of complex column
-  private int getComplexColumnCount() {
-    return model.getComplexIndexMap().size();
-  }
-
   /**
    * below method will be used to close the handler
    */
@@ -519,7 +419,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.dataWriter = getFactDataWriter();
     // initialize the channel;
     this.dataWriter.initializeWriter();
-    //initializeColGrpMinMax();
   }
 
   /**
@@ -571,14 +470,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     carbonDataWriterVo.setStoreLocation(model.getStoreLocation());
     carbonDataWriterVo.setMeasureCount(model.getMeasureCount());
     carbonDataWriterVo.setTableName(model.getTableName());
-    carbonDataWriterVo.setFileManager(fileManager);
-    carbonDataWriterVo.setRleEncodingForDictDim(rleEncodingForDictDimension);
-    carbonDataWriterVo.setIsComplexType(isComplexTypes());
     carbonDataWriterVo.setNoDictionaryCount(model.getNoDictionaryCount());
     carbonDataWriterVo.setCarbonDataFileAttributes(model.getCarbonDataFileAttributes());
     carbonDataWriterVo.setDatabaseName(model.getDatabaseName());
     carbonDataWriterVo.setWrapperColumnSchemaList(model.getWrapperColumnSchema());
-    carbonDataWriterVo.setIsDictionaryColumn(isDictDimension);
     carbonDataWriterVo.setCarbonDataDirectoryPath(model.getCarbonDataDirectoryPath());
     carbonDataWriterVo.setColCardinality(model.getColCardinality());
     carbonDataWriterVo.setSegmentProperties(model.getSegmentProperties());
@@ -590,31 +485,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     return carbonDataWriterVo;
   }
 
-  private boolean[] isComplexTypes() {
-    int noDictionaryCount = model.getNoDictionaryCount();
-    int noOfColumn = colGrpModel.getNoOfColumnStore() + noDictionaryCount + getComplexColumnCount();
-    int allColsCount = getColsCount(noOfColumn);
-    boolean[] isComplexType = new boolean[allColsCount];
-
-    List<Boolean> complexTypesList = new ArrayList<Boolean>(allColsCount);
-    for (int i = 0; i < noOfColumn; i++) {
-      GenericDataType complexDataType = model.getComplexIndexMap().get(i - noDictionaryCount);
-      if (complexDataType != null) {
-        int count = complexDataType.getColsCount();
-        for (int j = 0; j < count; j++) {
-          complexTypesList.add(true);
-        }
-      } else {
-        complexTypesList.add(false);
-      }
-    }
-    for (int i = 0; i < allColsCount; i++) {
-      isComplexType[i] = complexTypesList.get(i);
-    }
-
-    return isComplexType;
-  }
-
   /**
    * This method will reset the block processing count
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
deleted file mode 100644
index ddd9bf2..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.file;
-
-
-public class FileData extends FileManager {
-
-  /**
-   * Store Path
-   */
-  private String storePath;
-
-  /**
-   * hierarchyValueWriter
-   */
-
-  public FileData(String fileName, String storePath) {
-    this.fileName = fileName;
-    this.storePath = storePath;
-  }
-
-  /**
-   * @return Returns the carbonDataFileTempPath.
-   */
-  public String getFileName() {
-    return fileName;
-  }
-
-  /**
-   * @return Returns the storePath.
-   */
-  public String getStorePath() {
-    return storePath;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
deleted file mode 100644
index cfa3a66..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.file;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-public class FileManager implements IFileManagerComposite {
-  /**
-   * listOfFileData, composite parent which holds the different objects
-   */
-  protected List<IFileManagerComposite> listOfFileData =
-      new ArrayList<IFileManagerComposite>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
-  protected String fileName;
-
-  @Override public void add(IFileManagerComposite customData) {
-    listOfFileData.add(customData);
-  }
-
-  @Override public void remove(IFileManagerComposite customData) {
-    listOfFileData.remove(customData);
-
-  }
-
-  @Override public IFileManagerComposite get(int i) {
-    return listOfFileData.get(i);
-  }
-
-  @Override public void setName(String name) {
-    this.fileName = name;
-  }
-
-  /**
-   * Return the size
-   */
-  public int size() {
-    return listOfFileData.size();
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
deleted file mode 100644
index 6691772..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.file;
-
-public interface IFileManagerComposite {
-  /**
-   * Add the data which can be either row Folder(Composite) or File
-   *
-   * @param customData
-   */
-  void add(IFileManagerComposite customData);
-
-  /**
-   * Remove the CustomData type object from the IFileManagerComposite object hierarchy.
-   *
-   * @param customData
-   */
-  void remove(IFileManagerComposite customData);
-
-  /**
-   * get the CustomData type object name
-   *
-   * @return CustomDataIntf type
-   */
-  IFileManagerComposite get(int i);
-
-  /**
-   * set the CustomData type object name
-   *
-   * @param name
-   */
-  void setName(String name);
-
-  /**
-   * Get the size
-   *
-   * @return
-   */
-  int size();
-
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 1b6ba72..855ec03 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -60,7 +60,6 @@ import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.IndexHeader;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
-import org.apache.carbondata.processing.store.file.FileData;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.io.IOUtils;
@@ -317,9 +316,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
         .getCarbonDataFileName(fileCount, dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
             dataWriterVo.getBucketNumber(), dataWriterVo.getTaskExtension(),
             "" + dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
-    String actualFileNameVal = carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
-    FileData fileData = new FileData(actualFileNameVal, chosenTempLocation);
-    dataWriterVo.getFileManager().add(fileData);
     this.carbonDataFileTempPath = chosenTempLocation + File.separator
         + carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
     this.fileCount++;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 26fff09..79cdd95 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -22,7 +22,6 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 
 /**
  * Value object for writing the data
@@ -35,12 +34,6 @@ public class CarbonDataWriterVo {
 
   private String tableName;
 
-  private IFileManagerComposite fileManager;
-
-  private boolean[] rleEncodingForDictDim;
-
-  private boolean[] isComplexType;
-
   private int NoDictionaryCount;
 
   private CarbonDataFileAttributes carbonDataFileAttributes;
@@ -49,8 +42,6 @@ public class CarbonDataWriterVo {
 
   private List<ColumnSchema> wrapperColumnSchemaList;
 
-  private boolean[] isDictionaryColumn;
-
   private String carbonDataDirectoryPath;
 
   private int[] colCardinality;
@@ -110,48 +101,6 @@ public class CarbonDataWriterVo {
   }
 
   /**
-   * @return the fileManager
-   */
-  public IFileManagerComposite getFileManager() {
-    return fileManager;
-  }
-
-  /**
-   * @param fileManager the fileManager to set
-   */
-  public void setFileManager(IFileManagerComposite fileManager) {
-    this.fileManager = fileManager;
-  }
-
-  /**
-   * @return the rleEncodingForDictDim
-   */
-  public boolean[] getRleEncodingForDictDim() {
-    return rleEncodingForDictDim;
-  }
-
-  /**
-   * @param rleEncodingForDictDim the rleEncodingForDictDim to set
-   */
-  public void setRleEncodingForDictDim(boolean[] rleEncodingForDictDim) {
-    this.rleEncodingForDictDim = rleEncodingForDictDim;
-  }
-
-  /**
-   * @return the isComplexType
-   */
-  public boolean[] getIsComplexType() {
-    return isComplexType;
-  }
-
-  /**
-   * @param isComplexType the isComplexType to set
-   */
-  public void setIsComplexType(boolean[] isComplexType) {
-    this.isComplexType = isComplexType;
-  }
-
-  /**
    * @return the noDictionaryCount
    */
   public int getNoDictionaryCount() {
@@ -208,20 +157,6 @@ public class CarbonDataWriterVo {
   }
 
   /**
-   * @return the isDictionaryColumn
-   */
-  public boolean[] getIsDictionaryColumn() {
-    return isDictionaryColumn;
-  }
-
-  /**
-   * @param isDictionaryColumn the isDictionaryColumn to set
-   */
-  public void setIsDictionaryColumn(boolean[] isDictionaryColumn) {
-    this.isDictionaryColumn = isDictionaryColumn;
-  }
-
-  /**
    * @return the carbonDataDirectoryPath
    */
   public String getCarbonDataDirectoryPath() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index ca40830..7218a12 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -383,7 +383,7 @@ public final class CarbonDataProcessorUtil {
    */
   public static Set<String> getSchemaColumnNames(CarbonDataLoadSchema schema, String tableName) {
     Set<String> columnNames = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    String factTableName = schema.getCarbonTable().getFactTableName();
+    String factTableName = schema.getCarbonTable().getTableName();
     if (tableName.equals(factTableName)) {
       List<CarbonDimension> dimensions =
           schema.getCarbonTable().getDimensionByTableName(factTableName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 29a979d..db3442e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -263,6 +263,11 @@ public final class CarbonLoaderUtil {
     AbsoluteTableIdentifier absoluteTableIdentifier =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
     CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
+    String metadataPath = carbonTablePath.getMetadataDirectoryPath();
+    FileType fileType = FileFactory.getFileType(metadataPath);
+    if (!FileFactory.isFileExist(metadataPath, fileType)) {
+      FileFactory.mkdirs(metadataPath, fileType);
+    }
     String tableStatusPath = carbonTablePath.getTableStatusFilePath();
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 58cc019..e09e3db 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -297,9 +297,9 @@ public class StoreCreator {
     String header = reader.readLine();
     String[] split = header.split(",");
     List<CarbonColumn> allCols = new ArrayList<CarbonColumn>();
-    List<CarbonDimension> dims = table.getDimensionByTableName(table.getFactTableName());
+    List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName());
     allCols.addAll(dims);
-    List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getFactTableName());
+    List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getTableName());
     allCols.addAll(msrs);
     Set<String>[] set = new HashSet[dims.size()];
     for (int i = 0; i < set.length; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 943858d..7682437 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -66,7 +66,7 @@ public class StreamSegment {
     try {
       if (carbonLock.lockWithRetries()) {
         LOGGER.info(
-            "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+            "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
                 + " for stream table get or create segment");
 
         LoadMetadataDetails[] details =
@@ -104,17 +104,17 @@ public class StreamSegment {
       } else {
         LOGGER.error(
             "Not able to acquire the lock for stream table get or create segment for table " + table
-                .getDatabaseName() + "." + table.getFactTableName());
+                .getDatabaseName() + "." + table.getTableName());
         throw new IOException("Failed to get stream segment");
       }
     } finally {
       if (carbonLock.unlock()) {
         LOGGER.info("Table unlocked successfully after stream table get or create segment" + table
-            .getDatabaseName() + "." + table.getFactTableName());
+            .getDatabaseName() + "." + table.getTableName());
       } else {
         LOGGER.error(
             "Unable to unlock table lock for stream table" + table.getDatabaseName() + "." + table
-                .getFactTableName() + " during stream table get or create segment");
+                .getTableName() + " during stream table get or create segment");
       }
     }
   }
@@ -132,7 +132,7 @@ public class StreamSegment {
     try {
       if (carbonLock.lockWithRetries()) {
         LOGGER.info(
-            "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+            "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
                 + " for stream table finish segment");
 
         LoadMetadataDetails[] details =
@@ -165,17 +165,17 @@ public class StreamSegment {
       } else {
         LOGGER.error(
             "Not able to acquire the lock for stream table status updation for table " + table
-                .getDatabaseName() + "." + table.getFactTableName());
+                .getDatabaseName() + "." + table.getTableName());
         throw new IOException("Failed to get stream segment");
       }
     } finally {
       if (carbonLock.unlock()) {
         LOGGER.info(
             "Table unlocked successfully after table status updation" + table.getDatabaseName()
-                + "." + table.getFactTableName());
+                + "." + table.getTableName());
       } else {
         LOGGER.error("Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
-            .getFactTableName() + " during table status updation");
+            .getTableName() + " during table status updation");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 31ed1f6..2c4d35f 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -147,7 +147,7 @@ object StreamSinkFactory {
     val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, parameters)
     optionsFinal.put("sort_scope", "no_sort")
     if (parameters.get("fileheader").isEmpty) {
-      optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getFactTableName)
+      optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName)
         .asScala.map(_.getColName).mkString(","))
     }
     val carbonLoadModel = new CarbonLoadModel()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
index 6ee3296..c2789f4 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
@@ -42,14 +42,14 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi
         LockUsage.STREAMING_LOCK)
       if (lock.lockWithRetries()) {
         LOGGER.info("Acquired the lock for stream table: " + carbonTable.getDatabaseName + "." +
-                    carbonTable.getFactTableName)
+                    carbonTable.getTableName)
         cache.put(event.id, lock)
       } else {
         LOGGER.error("Not able to acquire the lock for stream table:" +
-                     carbonTable.getDatabaseName + "." + carbonTable.getFactTableName)
+                     carbonTable.getDatabaseName + "." + carbonTable.getTableName)
         throw new InterruptedException(
           "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." +
-          carbonTable.getFactTableName)
+          carbonTable.getTableName)
       }
     }
   }


[2/3] carbondata git commit: [CARBONDATA-1739] Clean up store path interface

Posted by qi...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index 822455c..64a066c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -47,9 +47,7 @@ case class CarbonDataMapShowCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     val schemaList = carbonTable.getTableInfo.getDataMapSchemaList
     if (schemaList != null && schemaList.size() > 0) {
       schemaList.asScala.map { s =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 66f2756..f34afbf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events._
 
 
@@ -60,12 +61,11 @@ case class CarbonDropDataMapCommand(
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
     val identifier = TableIdentifier(tableName, Option(dbName))
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
     val carbonEnv = CarbonEnv.getInstance(sparkSession)
     val catalog = carbonEnv.carbonMetastore
     val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonEnv.getInstance(sparkSession).storePath)
+      CarbonProperties.getStorePath)
     val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
     val tableIdentifier =
       AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
@@ -76,20 +76,19 @@ case class CarbonDropDataMapCommand(
         lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
       }
       LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]")
-      val carbonTable: Option[CarbonTable] =
-        catalog.getTableFromMetadataCache(dbName, tableName) match {
-          case Some(tableMeta) => Some(tableMeta.carbonTable)
-          case None => try {
-            Some(catalog.lookupRelation(identifier)(sparkSession)
-              .asInstanceOf[CarbonRelation].metaData.carbonTable)
-          } catch {
-            case ex: NoSuchTableException =>
-              if (!ifExistsSet) {
-                throw ex
-              }
-              None
-          }
+      var carbonTable: Option[CarbonTable] =
+        catalog.getTableFromMetadataCache(dbName, tableName)
+      if (carbonTable.isEmpty) {
+        try {
+          carbonTable = Some(catalog.lookupRelation(identifier)(sparkSession)
+            .asInstanceOf[CarbonRelation].metaData.carbonTable)
+        } catch {
+          case ex: NoSuchTableException =>
+            if (!ifExistsSet) {
+              throw ex
+            }
         }
+      }
       if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) {
         val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
           find(_._1.getDataMapName.equalsIgnoreCase(dataMapName))
@@ -144,7 +143,7 @@ case class CarbonDropDataMapCommand(
     // delete the table folder
     val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
     val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonEnv.getInstance(sparkSession).storePath)
+      CarbonProperties.getStorePath)
     val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
     val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
     DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 947cea1..2f04feb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -57,21 +57,21 @@ case class AlterTableCompactionCommand(
     if (relation == null) {
       sys.error(s"Table $databaseName.$tableName does not exist")
     }
-    if (null == relation.tableMeta.carbonTable) {
+    if (null == relation.carbonTable) {
       LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
       sys.error(s"alter table failed. table not found: $databaseName.$tableName")
     }
 
     val carbonLoadModel = new CarbonLoadModel()
 
-    val table = relation.tableMeta.carbonTable
-    carbonLoadModel.setTableName(table.getFactTableName)
+    val table = relation.carbonTable
+    carbonLoadModel.setTableName(table.getTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
     // Need to fill dimension relation
     carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-    carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
-    carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-    carbonLoadModel.setTablePath(relation.tableMeta.carbonTable.getTablePath)
+    carbonLoadModel.setTableName(relation.carbonTable.getTableName)
+    carbonLoadModel.setDatabaseName(relation.carbonTable.getDatabaseName)
+    carbonLoadModel.setTablePath(relation.carbonTable.getTablePath)
 
     var storeLocation = CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index 2003bb1..32d6b80 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -37,9 +37,7 @@ case class CarbonShowLoadsCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     CarbonStore.showSegments(
       GetDB.getDatabaseName(databaseNameOp, sparkSession),
       tableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
index 8b0dab7..58e33b7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, Runn
 import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
 
 case class CleanFilesCommand(
@@ -38,7 +39,7 @@ case class CleanFilesCommand(
     if (forceTableClean) {
       val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
       val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-        CarbonEnv.getInstance(sparkSession).storePath)
+        CarbonProperties.getStorePath)
       // TODO: TAABLEPATH
       CarbonStore.cleanFiles(
         dbName,
@@ -47,10 +48,7 @@ case class CleanFilesCommand(
         null,
         forceTableClean)
     } else {
-      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      val relation = catalog
-        .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
-      val carbonTable = relation.tableMeta.carbonTable
+      val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
       val cleanFilesPreEvent: CleanFilesPreEvent =
         CleanFilesPreEvent(carbonTable,
           sparkSession)
@@ -59,7 +57,7 @@ case class CleanFilesCommand(
       CarbonStore.cleanFiles(
         GetDB.getDatabaseName(databaseNameOp, sparkSession),
         tableName,
-        relation.asInstanceOf[CarbonRelation].tableMeta.storePath,
+        CarbonProperties.getStorePath,
         carbonTable,
         forceTableClean)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
index 6a0465c..5b305ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
@@ -35,9 +35,7 @@ case class DeleteLoadByIdCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     val operationContext = new OperationContext
 
     val deleteSegmentByIdPreEvent: DeleteSegmentByIdPreEvent =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
index 83f41bb..00c35a5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
@@ -37,9 +37,7 @@ case class DeleteLoadByLoadDateCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     val operationContext = new OperationContext
     val deleteSegmentByDatePreEvent: DeleteSegmentByDatePreEvent =
       DeleteSegmentByDatePreEvent(carbonTable,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
index 3f0e093..845a64c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
@@ -47,7 +47,7 @@ case class LoadTableByInsertCommand(
       Some(df)).run(sparkSession)
     // updating relation metadata. This is in case of auto detect high cardinality
     relation.carbonRelation.metaData =
-      CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable)
+      CarbonSparkUtil.createSparkMeta(relation.carbonRelation.carbonTable)
     load
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 777c169..0f4ca01 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.SegmentStatus
@@ -54,7 +55,8 @@ case class LoadTableCommand(
     isOverwriteTable: Boolean,
     var inputSqlString: String = null,
     dataFrame: Option[DataFrame] = None,
-    updateModel: Option[UpdateTableModel] = None)
+    updateModel: Option[UpdateTableModel] = None,
+    var tableInfoOp: Option[TableInfo] = None)
   extends RunnableCommand with DataProcessCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -72,16 +74,6 @@ case class LoadTableCommand(
     }
 
     val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-    if (null == relation.tableMeta.carbonTable) {
-      LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
-      LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
-      sys.error(s"Data loading failed. table not found: $dbName.$tableName")
-    }
 
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
@@ -105,18 +97,30 @@ case class LoadTableCommand(
     // update the property with new value
     carbonProperty.addProperty(CarbonCommonConstants.NUM_CORES_LOADING, numCoresLoading)
 
-    val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
-
-    val tableProperties = relation.tableMeta.carbonTable.getTableInfo
-      .getFactTable.getTableProperties
+    try {
+      val table = if (tableInfoOp.isDefined) {
+        CarbonTable.buildFromTableInfo(tableInfoOp.get)
+      } else {
+        val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        if (relation == null) {
+          sys.error(s"Table $dbName.$tableName does not exist")
+        }
+        if (null == relation.carbonTable) {
+          LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
+          LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
+          sys.error(s"Data loading failed. table not found: $dbName.$tableName")
+        }
+        relation.carbonTable
+      }
 
-    optionsFinal.put("sort_scope", tableProperties.getOrDefault("sort_scope",
-      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-        carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+      val tableProperties = table.getTableInfo.getFactTable.getTableProperties
+      val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+      optionsFinal.put("sort_scope", tableProperties.getOrDefault("sort_scope",
+        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+          carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+            CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
 
-    try {
-      val table = relation.tableMeta.carbonTable
       val carbonLoadModel = new CarbonLoadModel()
       val factPath = if (dataFrame.isDefined) {
         ""
@@ -137,11 +141,9 @@ case class LoadTableCommand(
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
         GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
-        val storePath = relation.tableMeta.storePath
         // add the start entry for the new load in the table status file
         if (updateModel.isEmpty) {
-          CommonUtil.
-            readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteTable)
+          CommonUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
         }
         if (isOverwriteTable) {
           LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
@@ -158,8 +160,7 @@ case class LoadTableCommand(
           carbonLoadModel.setUseOnePass(false)
         }
         // Create table and metadata folders if not exist
-        val carbonTablePath = CarbonStorePath
-          .getCarbonTablePath(storePath, table.getCarbonTableIdentifier)
+        val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
         val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
         val fileType = FileFactory.getFileType(metadataDirectoryPath)
         if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
@@ -192,9 +193,9 @@ case class LoadTableCommand(
       } finally {
         // Once the data load is successful delete the unwanted partition files
         try {
-          val partitionLocation = relation.tableMeta.storePath + "/partition/" +
+          val partitionLocation = CarbonProperties.getStorePath + "/partition/" +
                                   table.getDatabaseName + "/" +
-                                  table.getFactTableName + "/"
+                                  table.getTableName + "/"
           val fileType = FileFactory.getFileType(partitionLocation)
           if (FileFactory.isFileExist(partitionLocation, fileType)) {
             val file = FileFactory
@@ -234,7 +235,7 @@ case class LoadTableCommand(
       .getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier)
     val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
     val dimensions = carbonTable.getDimensionByTableName(
-      carbonTable.getFactTableName).asScala.toArray
+      carbonTable.getTableName).asScala.toArray
     val colDictFilePath = carbonLoadModel.getColDictFilePath
     if (!StringUtils.isEmpty(colDictFilePath)) {
       carbonLoadModel.initPredefDictMap()
@@ -378,8 +379,7 @@ case class LoadTableCommand(
     val identifier = model.table.getCarbonTableIdentifier
     // update CarbonDataLoadSchema
     val carbonTable = metastore.lookupRelation(Option(identifier.getDatabaseName),
-      identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
-      .carbonTable
+      identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].carbonTable
     carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index a52008a..efb6796 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -31,7 +31,6 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -64,24 +63,18 @@ object DeleteExecution {
       sparkSession: SparkSession,
       dataRdd: RDD[Row],
       timestamp: String,
-      relation: CarbonRelation,
       isUpdateOperation: Boolean,
-      executorErrors: ExecutionErrors
-  ): Boolean = {
+      executorErrors: ExecutionErrors): Boolean = {
 
     var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] = null
     val tableName = getTableIdentifier(identifier).table
     val database = GetDB.getDatabaseName(getTableIdentifier(identifier).database, sparkSession)
-    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
-      asInstanceOf[CarbonRelation]
-
-    val absoluteTableIdentifier = relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
+    val carbonTable = CarbonEnv.getCarbonTable(Some(database), tableName)(sparkSession)
+    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val carbonTablePath = CarbonStorePath
       .getCarbonTablePath(absoluteTableIdentifier)
     val factPath = carbonTablePath.getFactDir
 
-    val carbonTable = relation.tableMeta.carbonTable
     var deleteStatus = true
     val deleteRdd = if (isUpdateOperation) {
       val schema =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 34daf4e..6762489 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -54,7 +54,7 @@ object HorizontalCompaction {
     }
 
     var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
-    val carbonTable = carbonRelation.tableMeta.carbonTable
+    val carbonTable = carbonRelation.carbonTable
     val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val updateTimeStamp = System.currentTimeMillis()
     // To make sure that update and delete timestamps are not same,
@@ -116,7 +116,7 @@ object HorizontalCompaction {
       factTimeStamp: Long,
       segLists: util.List[String]): Unit = {
     val db = carbonTable.getDatabaseName
-    val table = carbonTable.getFactTableName
+    val table = carbonTable.getTableName
     // get the valid segments qualified for update compaction.
     val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
       absTableIdentifier,
@@ -133,7 +133,7 @@ object HorizontalCompaction {
     try {
       // Update Compaction.
       val alterTableModel = AlterTableModel(Option(carbonTable.getDatabaseName),
-        carbonTable.getFactTableName,
+        carbonTable.getTableName,
         Some(segmentUpdateStatusManager),
         CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
         Some(factTimeStamp),
@@ -167,7 +167,7 @@ object HorizontalCompaction {
       segLists: util.List[String]): Unit = {
 
     val db = carbonTable.getDatabaseName
-    val table = carbonTable.getFactTableName
+    val table = carbonTable.getTableName
     val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
       absTableIdentifier,
       segmentUpdateStatusManager,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
index b18ab78..5817d88 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
@@ -60,7 +60,7 @@ object IUDCommonUtil {
           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
             .getDatabaseName + "." +
           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
-            .getFactTableName
+            .getTableName
         val sementProperty = carbonProperties
           .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbAndTb, "")
         if (!(sementProperty.equals("") || sementProperty.trim.equals("*"))) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
index a898822..cf5bfd8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
@@ -51,7 +51,7 @@ private[sql] case class ProjectForDeleteCommand(
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
       asInstanceOf[CarbonRelation]
-    val carbonTable = relation.tableMeta.carbonTable
+    val carbonTable = relation.carbonTable
 
     // trigger event for Delete from table
     val operationContext = new OperationContext
@@ -77,9 +77,8 @@ private[sql] case class ProjectForDeleteCommand(
       // handle the clean up of IUD.
       CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
-      if (DeleteExecution
-        .deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp, relation,
-          isUpdateOperation = false, executorErrors)) {
+      if (DeleteExecution.deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp,
+        isUpdateOperation = false, executorErrors)) {
         // call IUD Compaction.
         HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation,
           isUpdateOperation = false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
index 549c58f..da62f27 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -30,7 +30,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
 import org.apache.carbondata.processing.loading.FailureCauses
 
@@ -58,7 +57,7 @@ private[sql] case class ProjectForUpdateCommand(
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(DeleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
       asInstanceOf[CarbonRelation]
-    val carbonTable = relation.tableMeta.carbonTable
+    val carbonTable = relation.carbonTable
 
     // trigger event for Update table
     val operationContext = new OperationContext
@@ -74,7 +73,7 @@ private[sql] case class ProjectForUpdateCommand(
     val currentTime = CarbonUpdateUtil.readCurrentTime
     //    var dataFrame: DataFrame = null
     var dataSet: DataFrame = null
-    val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset()
+    val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset
     try {
       lockStatus = metadataLock.lockWithRetries()
       if (lockStatus) {
@@ -83,7 +82,6 @@ private[sql] case class ProjectForUpdateCommand(
       else {
         throw new Exception("Table is locked for updation. Please try after some time")
       }
-      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       // Get RDD.
 
       dataSet = if (isPersistEnabled) {
@@ -93,7 +91,7 @@ private[sql] case class ProjectForUpdateCommand(
       else {
         Dataset.ofRows(sparkSession, plan)
       }
-      var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
 
 
       // handle the clean up of IUD.
@@ -101,8 +99,7 @@ private[sql] case class ProjectForUpdateCommand(
 
       // do delete operation.
       DeleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd,
-        currentTime + "",
-        relation, isUpdateOperation = true, executionErrors)
+        currentTime + "", isUpdateOperation = true, executionErrors)
 
       if(executionErrors.failureCauses != FailureCauses.NONE) {
         throw new Exception(executionErrors.errorMsg)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
index acd9bd3..5a0e4cc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
@@ -65,8 +65,7 @@ case class AlterTableDropCarbonPartitionCommand(
     val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
       .asInstanceOf[CarbonRelation]
-    val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
-    val tablePath = relation.tableMeta.tablePath
+    val tablePath = relation.carbonTable.getTablePath
     carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
     if (relation == null) {
       sys.error(s"Table $dbName.$tableName does not exist")
@@ -75,7 +74,7 @@ case class AlterTableDropCarbonPartitionCommand(
       LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
       sys.error(s"Alter table failed. table not found: $dbName.$tableName")
     }
-    val table = relation.tableMeta.carbonTable
+    val table = relation.carbonTable
     val partitionInfo = table.getPartitionInfo(tableName)
     if (partitionInfo == null) {
       sys.error(s"Table $tableName is not a partition table.")
@@ -101,7 +100,7 @@ case class AlterTableDropCarbonPartitionCommand(
         sys.error(s"Dropping range interval partition isn't support yet!")
     }
     partitionInfo.dropPartition(partitionIndex)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     // read TableInfo
     val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -142,17 +141,13 @@ case class AlterTableDropCarbonPartitionCommand(
       locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
         locksToBeAcquired)(sparkSession)
       val carbonLoadModel = new CarbonLoadModel()
-      val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
-        .asInstanceOf[CarbonRelation]
-      val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
-      val table = relation.tableMeta.carbonTable
+      val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       val dataLoadSchema = new CarbonDataLoadSchema(table)
       // Need to fill dimension relation
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-      carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
-      carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
-      carbonLoadModel.setTablePath(relation.tableMeta.tablePath)
+      carbonLoadModel.setTableName(table.getTableName)
+      carbonLoadModel.setDatabaseName(table.getDatabaseName)
+      carbonLoadModel.setTablePath(table.getTablePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime
       carbonLoadModel.setFactTimeStamp(loadStartTime)
       alterTableDropPartition(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
index 0973226..841da67 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
@@ -69,8 +69,7 @@ case class AlterTableSplitCarbonPartitionCommand(
     val tableName = splitPartitionModel.tableName
     val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
       .asInstanceOf[CarbonRelation]
-    val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
-    val tablePath = relation.tableMeta.tablePath
+    val tablePath = relation.carbonTable.getTablePath
     if (relation == null) {
       sys.error(s"Table $dbName.$tableName does not exist")
     }
@@ -79,7 +78,7 @@ case class AlterTableSplitCarbonPartitionCommand(
       LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
       sys.error(s"Alter table failed. table not found: $dbName.$tableName")
     }
-    val table = relation.tableMeta.carbonTable
+    val table = relation.carbonTable
     val partitionInfo = table.getPartitionInfo(tableName)
     val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
     // keep a copy of partitionIdList before update partitionInfo.
@@ -95,7 +94,7 @@ case class AlterTableSplitCarbonPartitionCommand(
 
     updatePartitionInfo(partitionInfo, partitionIds)
 
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     // read TableInfo
     val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -150,16 +149,12 @@ case class AlterTableSplitCarbonPartitionCommand(
       locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
         locksToBeAcquired)(sparkSession)
       val carbonLoadModel = new CarbonLoadModel()
-      val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
-        .asInstanceOf[CarbonRelation]
-      val tablePath = relation.tableMeta.tablePath
-      val table = relation.tableMeta.carbonTable
-      val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
+      val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      val tablePath = table.getTablePath
       val dataLoadSchema = new CarbonDataLoadSchema(table)
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-      carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
-      carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
+      carbonLoadModel.setTableName(table.getTableName)
+      carbonLoadModel.setDatabaseName(table.getDatabaseName)
       carbonLoadModel.setTablePath(tablePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime
       carbonLoadModel.setFactTimeStamp(loadStartTime)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
index 224304a..903e93b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
@@ -41,10 +41,9 @@ private[sql] case class ShowCarbonPartitionsCommand(
 
   override def processSchema(sparkSession: SparkSession): Seq[Row] = {
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(tableIdentifier)(sparkSession).
-      asInstanceOf[CarbonRelation]
-    val carbonTable = relation.tableMeta.carbonTable
-    val tableName = carbonTable.getFactTableName
+      .lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+    val carbonTable = relation.carbonTable
+    val tableName = carbonTable.getTableName
     val partitionInfo = carbonTable.getPartitionInfo(
       carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
     if (partitionInfo == null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index dd002f0..3854f76 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -73,7 +73,7 @@ case class CreatePreAggregateTableCommand(
     // getting the parent table
     val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
     // getting the table name
-    val parentTableName = parentTable.getFactTableName
+    val parentTableName = parentTable.getTableName
     // getting the db name of parent table
     val parentDbName = parentTable.getDatabaseName
 
@@ -85,9 +85,8 @@ case class CreatePreAggregateTableCommand(
     tableModel.dataMapRelation = Some(fieldRelationMap)
     CarbonCreateTableCommand(tableModel).run(sparkSession)
     try {
-      val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation( tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
-      val tableInfo = relation.tableMeta.carbonTable.getTableInfo
+      val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
+      val tableInfo = table.getTableInfo
       // child schema object which will be updated on parent table about the
       val childSchema = tableInfo.getFactTable
         .buildChildSchema(dataMapName, CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 506a405..f64deec 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -84,7 +84,7 @@ object PreAggregateDataTypeChangePreListener extends OperationEventListener {
     if (carbonTable.isChildDataMap) {
       throw new UnsupportedOperationException(
         s"Cannot change data type for columns in pre-aggregate table ${ carbonTable.getDatabaseName
-        }.${ carbonTable.getFactTableName }")
+        }.${ carbonTable.getTableName }")
     }
   }
 }
@@ -102,7 +102,7 @@ object PreAggregateAddColumnsPreListener extends OperationEventListener {
     if (carbonTable.isChildDataMap) {
       throw new UnsupportedOperationException(
         s"Cannot add columns in pre-aggreagate table ${ carbonTable.getDatabaseName
-        }.${ carbonTable.getFactTableName }")
+        }.${ carbonTable.getTableName }")
     }
   }
 }
@@ -185,7 +185,7 @@ object PreAggregateDropColumnPreListener extends OperationEventListener {
     }
     if (carbonTable.isChildDataMap) {
       throw new UnsupportedOperationException(s"Cannot drop columns in pre-aggreagate table ${
-        carbonTable.getDatabaseName}.${ carbonTable.getFactTableName }")
+        carbonTable.getDatabaseName}.${ carbonTable.getTableName }")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 3193310..1647f9e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -326,21 +326,19 @@ object PreAggregateUtil {
     var numberOfCurrentChild: Int = 0
     try {
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
+      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
       // get the latest carbon table and check for column existence
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(
+        carbonTable.getAbsoluteTableIdentifier)
       val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
-      val wrapperTableInfo = schemaConverter
-        .fromExternalToWrapperTableInfo(thriftTableInfo,
-          dbName,
-          tableName,
-          carbonTable.getTablePath)
+      val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+        thriftTableInfo,
+        dbName,
+        tableName,
+        carbonTable.getTablePath)
       numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
       if (wrapperTableInfo.getDataMapSchemaList.asScala.
         exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) {
@@ -374,7 +372,7 @@ object PreAggregateUtil {
   def updateSchemaInfo(carbonTable: CarbonTable,
       thriftTable: TableInfo)(sparkSession: SparkSession): Unit = {
     val dbName = carbonTable.getDatabaseName
-    val tableName = carbonTable.getFactTableName
+    val tableName = carbonTable.getTableName
     CarbonEnv.getInstance(sparkSession).carbonMetastore
       .updateTableSchemaForDataMap(carbonTable.getCarbonTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
@@ -435,31 +433,30 @@ object PreAggregateUtil {
   def revertMainTableChanges(dbName: String, tableName: String, numberOfChildSchema: Int)
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val carbonTable = metastore
-      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
-      .carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
     carbonTable.getTableLastUpdatedTime
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
     if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
-      metastore
-        .revertTableSchemaForPreAggCreationFailure(carbonTable.getAbsoluteTableIdentifier,
-          thriftTable)(sparkSession)
+      metastore.revertTableSchemaForPreAggCreationFailure(
+        carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession)
     }
   }
 
   def getChildCarbonTable(databaseName: String, tableName: String)
     (sparkSession: SparkSession): Option[CarbonTable] = {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    metaStore.getTableFromMetadataCache(databaseName, tableName) match {
-      case Some(tableMeta) => Some(tableMeta.carbonTable)
-      case None => try {
+    val carbonTable = metaStore.getTableFromMetadataCache(databaseName, tableName)
+    if (carbonTable.isEmpty) {
+      try {
         Some(metaStore.lookupRelation(Some(databaseName), tableName)(sparkSession)
           .asInstanceOf[CarbonRelation].metaData.carbonTable)
       } catch {
         case _: Exception =>
           None
       }
+    } else {
+      carbonTable
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 2132131..3b39334 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -57,9 +57,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       // older carbon table and this can lead to inconsistent state in the system. Therefor look
       // up relation should be called after acquiring the lock
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
+      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       val alterTableAddColumnListener = AlterTableAddColumnPreEvent(carbonTable,
         alterTableAddColumnsModel)
       OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index e44899e..c24a8e9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -51,9 +51,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
+      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       val alterTableDataTypeChangeListener = AlterTableDataTypeChangePreEvent(carbonTable,
         alterTableDataTypeChangeModel)
       OperationListenerBus.getInstance().fireEvent(alterTableDataTypeChangeListener)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index dae2d7b..721dd0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -53,9 +53,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
+      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       val partitionInfo = carbonTable.getPartitionInfo(tableName)
       if (partitionInfo != null) {
         val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index f1cce13..e7beedd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -81,9 +81,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
       locks = AlterTableUtil
         .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
           sparkSession)
-      val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
-        .asInstanceOf[CarbonRelation].tableMeta
-      carbonTable = tableMeta.carbonTable
+      carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+        .asInstanceOf[CarbonRelation].carbonTable
       // invalid data map for the old table, see CARBON-1690
       val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
@@ -134,7 +133,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
         carbonTable.getCarbonTableIdentifier,
         tableInfo,
         schemaEvolutionEntry,
-        tableMeta.tablePath)(sparkSession)
+        carbonTable.getTablePath)(sparkSession)
 
       val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
         carbonTable,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index c126b25..a060833 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -357,11 +357,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
 
   private def getPartitioning(carbonTable: CarbonTable,
       output: Seq[Attribute]): Partitioning = {
-    val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
+    val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getTableName)
     if (info != null) {
       val cols = info.getListOfColumns.asScala
       val sortColumn = carbonTable.
-        getDimensionByTableName(carbonTable.getFactTableName).get(0).getColName
+        getDimensionByTableName(carbonTable.getTableName).get(0).getColName
       val numBuckets = info.getNumberOfBuckets
       val bucketColumns = cols.flatMap { n =>
         val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index ef2e0a5..d6450c1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -51,8 +51,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         val dbOption = oldTableIdentifier.database.map(_.toLowerCase)
         val tableIdentifier = TableIdentifier(oldTableIdentifier.table.toLowerCase(), dbOption)
         val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .tableExists(tableIdentifier)(
-            sparkSession)
+          .tableExists(tableIdentifier)(sparkSession)
         if (isCarbonTable) {
           val renameModel = AlterTableRenameModel(tableIdentifier, newTableIdentifier)
           ExecutedCommandExec(CarbonAlterTableRenameCommand(renameModel)) :: Nil
@@ -155,13 +154,13 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
         ExecutedCommandExec(cmd) :: Nil
       case AlterTableSetPropertiesCommand(tableName, properties, isView)
-        if (CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .tableExists(tableName)(sparkSession)) => {
+        if CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(tableName)(sparkSession) => {
         ExecutedCommandExec(AlterTableSetCommand(tableName, properties, isView)) :: Nil
       }
       case AlterTableUnsetPropertiesCommand(tableName, propKeys, ifExists, isView)
-        if (CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .tableExists(tableName)(sparkSession)) => {
+        if CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(tableName)(sparkSession) => {
         ExecutedCommandExec(AlterTableUnsetCommand(tableName, propKeys, ifExists, isView)) :: Nil
       }
       case _ => Nil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 9ebf47e..49a57e6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.execution.command.{AlterTableRenameCommand, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.AlterTableRenameCommand
 import org.apache.spark.sql.execution.command.mutation.{DeleteExecution, ProjectForDeleteCommand, ProjectForUpdateCommand}
 import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
 import org.apache.spark.sql.hive.CarbonRelation
@@ -76,7 +76,6 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp
     val streaming = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(tableIdentifier)(sparkSession)
       .asInstanceOf[CarbonRelation]
-      .tableMeta
       .carbonTable
       .isStreamingTable
     if (streaming) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 6d80a26..87c919d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicLong
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession}
@@ -44,13 +43,12 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
-case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) {
+case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
   // clear the metadata
   def clear(): Unit = {
-    tablesMeta.clear()
+    carbonTables.clear()
   }
 }
 
@@ -80,7 +78,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     System.nanoTime() + ""
   }
 
-  val metadata = MetaData(new ArrayBuffer[TableMeta]())
+  val metadata = MetaData(new ArrayBuffer[CarbonTable]())
 
 
   /**
@@ -98,13 +96,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val tables = getTableFromMetadataCache(database, tableName)
     tables match {
       case Some(t) =>
-        CarbonRelation(database, tableName,
-          CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
+        CarbonRelation(database, tableName, CarbonSparkUtil.createSparkMeta(t), t)
       case None =>
         readCarbonSchema(absIdentifier) match {
           case Some(meta) =>
             CarbonRelation(database, tableName,
-              CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
+              CarbonSparkUtil.createSparkMeta(meta), meta)
           case None =>
             throw new NoSuchTableException(database, tableName)
         }
@@ -151,7 +148,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val operationContext = new OperationContext
     val lookupRelationPostEvent: LookupRelationPostEvent =
       LookupRelationPostEvent(
-        relation.tableMeta.carbonTable,
+        relation.carbonTable,
         sparkSession)
     OperationListenerBus.getInstance.fireEvent(lookupRelationPostEvent, operationContext)
     relation
@@ -164,10 +161,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
    * @param tableName
    * @return
    */
-  def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = {
-    metadata.tablesMeta
-      .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-        c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+  def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] = {
+    metadata.carbonTables
+      .find(table => table.getDatabaseName.equalsIgnoreCase(database) &&
+        table.getTableName.equalsIgnoreCase(tableName))
   }
 
   def tableExists(
@@ -187,7 +184,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     true
   }
 
-  private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
+  private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[CarbonTable] = {
     val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = identifier.getCarbonTableIdentifier.getTableName
     val tablePath = identifier.getTablePath
@@ -210,12 +207,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
         .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
       val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-      val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
-        identifier.getTablePath,
-        identifier.getTablePath,
-        carbonTable)
-      metadata.tablesMeta += tableMeta
-      Some(tableMeta)
+      metadata.carbonTables += carbonTable
+      Some(carbonTable)
     } else {
       None
     }
@@ -378,10 +371,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
     CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
     removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getTablePath,
-      absoluteTableIdentifier.getTablePath,
-      CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
-    metadata.tablesMeta += tableMeta
+    metadata.carbonTables +=
+      CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)
   }
 
   /**
@@ -391,10 +382,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
    * @param tableName
    */
   def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
-    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
-    metadataToBeRemoved match {
-      case Some(tableMeta) =>
-        metadata.tablesMeta -= tableMeta
+    val carbonTableToBeRemoved: Option[CarbonTable] = getTableFromMetadataCache(dbName, tableName)
+    carbonTableToBeRemoved match {
+      case Some(carbonTable) =>
+        metadata.carbonTables -= carbonTable
         CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
       case None =>
         if (LOGGER.isDebugEnabled) {
@@ -409,10 +400,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
     CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
       wrapperTableInfo.getTableUniqueName)
-    for (i <- metadata.tablesMeta.indices) {
+    for (i <- metadata.carbonTables.indices) {
       if (wrapperTableInfo.getTableUniqueName.equals(
-        metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
-        metadata.tablesMeta(i).carbonTable = carbonTable
+        metadata.carbonTables(i).getTableUniqueName)) {
+        metadata.carbonTables(i) = carbonTable
       }
     }
   }
@@ -434,8 +425,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
 
   def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
     try {
-      val tablePath = lookupRelation(tableIdentifier)(sparkSession).
-        asInstanceOf[CarbonRelation].tableMeta.tablePath
+      val tablePath = lookupRelation(tableIdentifier)(sparkSession)
+        .asInstanceOf[CarbonRelation].carbonTable.getTablePath
       val fileType = FileFactory.getFileType(tablePath)
       FileFactory.isFileExist(tablePath, fileType)
     } catch {
@@ -531,13 +522,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
   }
 
   private def refreshCache() {
-    metadata.tablesMeta.clear()
+    metadata.carbonTables.clear()
   }
 
   override def isReadFromHiveMetaStore: Boolean = false
 
   override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
-    metadata.tablesMeta.map(_.carbonTable)
+    metadata.carbonTables
 
   override def getThriftTableInfo(tablePath: CarbonTablePath)
     (sparkSession: SparkSession): TableInfo = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index dedaf1c..4d4229a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -21,20 +21,17 @@ import scala.collection.JavaConverters._
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.format
 import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.processing.merger.TableMeta
-import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil}
+import org.apache.carbondata.spark.util.CarbonSparkUtil
 
 /**
  * Metastore to store carbonschema in hive
@@ -56,10 +53,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     val info = CarbonUtil.convertGsonToTableInfo(parameters.asJava)
     if (info != null) {
       val table = CarbonTable.buildFromTableInfo(info)
-      val meta = new TableMeta(table.getCarbonTableIdentifier,
-        absIdentifier.getTablePath, absIdentifier.getTablePath, table)
       CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName,
-        CarbonSparkUtil.createSparkMeta(table), meta)
+        CarbonSparkUtil.createSparkMeta(table), table)
     } else {
       super.createCarbonRelation(parameters, absIdentifier, sparkSession)
     }
@@ -107,7 +102,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo,
       carbonTable.getDatabaseName,
-      carbonTable.getFactTableName)
+      carbonTable.getTableName)
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 357a812..696342f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.processing.merger.TableMeta
+
 
 /**
  * Interface for Carbonmetastore
@@ -140,7 +140,7 @@ trait CarbonMetaStore {
 
   def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo
 
-  def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta]
+  def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
 
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 3fb0db0..c48e6e8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -761,7 +761,7 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
           .DEFAULT_MAX_NUMBER_OF_COLUMNS
         )
     }
-    val isAggregateTable = !relation.carbonRelation.tableMeta.carbonTable.getTableInfo
+    val isAggregateTable = !relation.carbonRelation.carbonTable.getTableInfo
       .getParentRelationIdentifiers.isEmpty
     // transform logical plan if the load is for aggregate table.
     val childPlan = if (isAggregateTable) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 2c476ed..9187fe2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -29,10 +29,10 @@ import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.merger.TableMeta
 
 /**
  * Represents logical plan for one carbon table
@@ -41,7 +41,7 @@ case class CarbonRelation(
     databaseName: String,
     tableName: String,
     var metaData: CarbonMetaData,
-    tableMeta: TableMeta)
+    carbonTable: CarbonTable)
   extends LeafNode with MultiInstanceRelation {
 
   def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
@@ -84,17 +84,17 @@ case class CarbonRelation(
   }
 
   override def newInstance(): LogicalPlan = {
-    CarbonRelation(databaseName, tableName, metaData, tableMeta)
+    CarbonRelation(databaseName, tableName, metaData, carbonTable)
       .asInstanceOf[this.type]
   }
 
-  val dimensionsAttr = {
+  val dimensionsAttr: Seq[AttributeReference] = {
     val sett = new LinkedHashSet(
-      tableMeta.carbonTable.getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName)
+      carbonTable.getDimensionByTableName(carbonTable.getTableName)
         .asScala.asJava)
     sett.asScala.toSeq.map(dim => {
       val dimval = metaData.carbonTable
-        .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
+        .getDimensionByName(metaData.carbonTable.getTableName, dim.getColName)
       val output: DataType = dimval.getDataType.getName.toLowerCase match {
         case "array" =>
           CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
@@ -113,11 +113,10 @@ case class CarbonRelation(
   }
 
   val measureAttr = {
-    val factTable = tableMeta.carbonTable.getFactTableName
+    val factTable = carbonTable.getTableName
     new LinkedHashSet(
-      tableMeta.carbonTable.
-        getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
-        asScala.asJava).asScala.toSeq.map { x =>
+      carbonTable.getMeasureByTableName(carbonTable.getTableName).asScala.asJava).asScala.toSeq
+      .map { x =>
       val metastoreType = metaData.carbonTable.getMeasureByName(factTable, x.getColName)
         .getDataType.getName.toLowerCase match {
         case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
@@ -131,7 +130,7 @@ case class CarbonRelation(
   }
 
   override val output = {
-    val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName)
+    val columns = carbonTable.getCreateOrderColumn(carbonTable.getTableName)
       .asScala
     // convert each column to Attribute
     columns.filter(!_.isInvisible).map { column =>
@@ -196,12 +195,11 @@ case class CarbonRelation(
 
   def sizeInBytes: Long = {
     val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
-      tableMeta.carbonTable.getAbsoluteTableIdentifier)
+      carbonTable.getAbsoluteTableIdentifier)
 
     if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
       val tablePath = CarbonStorePath.getCarbonTablePath(
-        tableMeta.storePath,
-        tableMeta.carbonTableIdentifier).getPath
+        carbonTable.getAbsoluteTableIdentifier).getPath
       val fileType = FileFactory.getFileType(tablePath)
       if(FileFactory.isFileExist(tablePath, fileType)) {
         tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index e587395..b0aecd7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events._
 
 /**
@@ -106,15 +107,15 @@ class CarbonSessionCatalog(
       alias: Option[String],
       carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
     var isRefreshed = false
-    val storePath = CarbonEnv.getInstance(sparkSession).storePath
+    val storePath = CarbonProperties.getStorePath
     carbonEnv.carbonMetastore.
       checkSchemasModifiedTimeAndReloadTables()
 
-    val tableMeta = carbonEnv.carbonMetastore
-      .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
-        carbonDatasourceHadoopRelation.carbonTable.getFactTableName)
-    if (tableMeta.isEmpty || (tableMeta.isDefined &&
-        tableMeta.get.carbonTable.getTableLastUpdatedTime !=
+    val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
+      carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
+      carbonDatasourceHadoopRelation.carbonTable.getTableName)
+    if (table.isEmpty || (table.isDefined &&
+        table.get.getTableLastUpdatedTime !=
           carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
       refreshTable(identifier)
       DataMapStoreManager.getInstance().

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index b76b24e..9cc5d86 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -42,11 +42,11 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
     var databaseLocation = ""
     try {
       databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-        CarbonEnv.getInstance(sparkSession).storePath)
+        CarbonProperties.getStorePath)
     } catch {
       case e: NoSuchDatabaseException =>
         // ignore the exception as exception will be handled by hive command.run
-      databaseLocation = CarbonEnv.getInstance(sparkSession).storePath
+      databaseLocation = CarbonProperties.getStorePath
     }
     // DropHiveDB command will fail if cascade is false and one or more table exists in database
     if (command.cascade && tablesInDB != null) {


[3/3] carbondata git commit: [CARBONDATA-1739] Clean up store path interface

Posted by qi...@apache.org.
[CARBONDATA-1739] Clean up store path interface

This closes #1509


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5fc7f06f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5fc7f06f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5fc7f06f

Branch: refs/heads/master
Commit: 5fc7f06f23e944719b2735b97176d68fe209ad75
Parents: b6777fc
Author: Jacky Li <ja...@qq.com>
Authored: Thu Nov 16 19:41:19 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Fri Nov 17 14:46:19 2017 +0800

----------------------------------------------------------------------
 .../dictionary/ManageDictionaryAndBTree.java    |   2 +-
 .../core/metadata/CarbonMetadata.java           |   2 +-
 .../core/metadata/schema/table/CarbonTable.java |   4 +-
 .../core/mutate/CarbonUpdateUtil.java           |   8 +-
 .../carbondata/core/scan/model/QueryModel.java  |   4 +-
 .../carbondata/core/util/CarbonProperties.java  |   7 +
 .../core/metadata/CarbonMetadataTest.java       |   9 +-
 .../metadata/schema/table/CarbonTableTest.java  |   3 +-
 .../table/CarbonTableWithComplexTypesTest.java  |   2 +-
 .../carbondata/examples/StreamExample.scala     |   4 +-
 .../carbondata/hadoop/CarbonInputFormat.java    |   2 +-
 .../hadoop/api/CarbonTableInputFormat.java      |   4 +-
 .../streaming/CarbonStreamRecordReader.java     |  10 +-
 .../streaming/CarbonStreamRecordWriter.java     |   4 +-
 .../hadoop/util/CarbonInputFormatUtil.java      |   6 +-
 .../hadoop/test/util/StoreCreator.java          |   4 +-
 .../presto/impl/CarbonTableReader.java          |   2 +-
 .../presto/util/CarbonDataStoreCreator.scala    |   4 +-
 .../TestPreAggregateTableSelection.scala        |   2 +-
 .../partition/TestDDLForPartitionTable.scala    |   6 +-
 ...ForPartitionTableWithDefaultProperties.scala |   8 +-
 .../carbondata/spark/load/ValidateUtil.scala    |   4 +-
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  |   2 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |   2 +-
 .../carbondata/spark/rdd/PartitionDropper.scala |   2 +-
 .../spark/rdd/PartitionSplitter.scala           |   2 +-
 .../carbondata/spark/util/CommonUtil.scala      |  32 +----
 .../carbondata/spark/util/DataLoadingUtil.scala |   8 +-
 .../spark/util/GlobalDictionaryUtil.scala       |  12 +-
 .../command/carbonTableSchemaCommon.scala       |   4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  12 +-
 .../carbondata/spark/util/CarbonSparkUtil.scala |  18 ++-
 .../spark/sql/CarbonDataFrameWriter.scala       |   3 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   4 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  16 +--
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  36 ++++-
 .../scala/org/apache/spark/sql/CarbonScan.scala |   6 +-
 .../org/apache/spark/sql/CarbonSource.scala     |  20 +--
 .../command/CarbonCreateTableCommand.scala      |   4 +-
 .../CarbonDescribeFormattedCommand.scala        |  20 +--
 .../command/CarbonDropTableCommand.scala        |   9 +-
 .../datamap/CarbonDataMapShowCommand.scala      |   4 +-
 .../datamap/CarbonDropDataMapCommand.scala      |  31 +++--
 .../AlterTableCompactionCommand.scala           |  12 +-
 .../management/CarbonShowLoadsCommand.scala     |   4 +-
 .../command/management/CleanFilesCommand.scala  |  10 +-
 .../management/DeleteLoadByIdCommand.scala      |   4 +-
 .../DeleteLoadByLoadDateCommand.scala           |   4 +-
 .../management/LoadTableByInsertCommand.scala   |   2 +-
 .../command/management/LoadTableCommand.scala   |  62 ++++-----
 .../command/mutation/DeleteExecution.scala      |  13 +-
 .../command/mutation/HorizontalCompaction.scala |   8 +-
 .../command/mutation/IUDCommonUtil.scala        |   2 +-
 .../mutation/ProjectForDeleteCommand.scala      |   7 +-
 .../mutation/ProjectForUpdateCommand.scala      |  11 +-
 .../AlterTableDropCarbonPartitionCommand.scala  |  19 +--
 .../AlterTableSplitCarbonPartitionCommand.scala |  19 +--
 .../partition/ShowCarbonPartitionsCommand.scala |   7 +-
 .../CreatePreAggregateTableCommand.scala        |   7 +-
 .../preaaggregate/PreAggregateListeners.scala   |   6 +-
 .../preaaggregate/PreAggregateUtil.scala        |  37 +++---
 .../CarbonAlterTableAddColumnCommand.scala      |   4 +-
 .../CarbonAlterTableDataTypeChangeCommand.scala |   4 +-
 .../CarbonAlterTableDropColumnCommand.scala     |   4 +-
 .../schema/CarbonAlterTableRenameCommand.scala  |   7 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |   4 +-
 .../sql/execution/strategy/DDLStrategy.scala    |  11 +-
 .../strategy/StreamingTableStrategy.scala       |   3 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |  61 ++++-----
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |  13 +-
 .../apache/spark/sql/hive/CarbonMetaStore.scala |   4 +-
 .../sql/hive/CarbonPreAggregateRules.scala      |   2 +-
 .../apache/spark/sql/hive/CarbonRelation.scala  |  26 ++--
 .../spark/sql/hive/CarbonSessionState.scala     |  13 +-
 .../execution/command/CarbonHiveCommands.scala  |   4 +-
 .../org/apache/spark/util/AlterTableUtil.scala  |  36 ++---
 .../org/apache/spark/util/CleanFiles.scala      |   5 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   5 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   4 +-
 .../partition/TestAlterPartitionTable.scala     |  32 ++---
 .../spark/util/AllDictionaryTestCase.scala      |  16 +--
 .../spark/util/DictionaryTestCaseUtil.scala     |   6 +-
 .../util/ExternalColumnDictionaryTestCase.scala |  16 +--
 .../loading/DataLoadProcessBuilder.java         |   6 +-
 .../merger/CarbonCompactionExecutor.java        |   4 +-
 .../processing/merger/CarbonCompactionUtil.java |   4 +-
 .../processing/merger/CarbonDataMergerUtil.java |   8 +-
 .../carbondata/processing/merger/TableMeta.java |  42 ------
 .../spliter/AbstractCarbonQueryExecutor.java    |   4 +-
 .../partition/spliter/RowResultProcessor.java   |   2 +-
 .../store/CarbonFactDataHandlerColumnar.java    | 130 -------------------
 .../processing/store/file/FileData.java         |  52 --------
 .../processing/store/file/FileManager.java      |  59 ---------
 .../store/file/IFileManagerComposite.java       |  57 --------
 .../store/writer/AbstractFactDataWriter.java    |   4 -
 .../store/writer/CarbonDataWriterVo.java        |  65 ----------
 .../util/CarbonDataProcessorUtil.java           |   2 +-
 .../processing/util/CarbonLoaderUtil.java       |   5 +
 .../carbondata/processing/StoreCreator.java     |   4 +-
 .../streaming/segment/StreamSegment.java        |  16 +--
 .../streaming/StreamSinkFactory.scala           |   2 +-
 .../CarbonStreamingQueryListener.scala          |   6 +-
 102 files changed, 423 insertions(+), 911 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
index a6c89e0..f8d2495 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
@@ -112,7 +112,7 @@ public class ManageDictionaryAndBTree {
     }
     // clear dictionary cache from LRU cache
     List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     for (CarbonDimension dimension : dimensions) {
       removeDictionaryColumnFromCache(carbonTable.getAbsoluteTableIdentifier(),
           dimension.getColumnId());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
index 75fe78b..2face7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -120,7 +120,7 @@ public final class CarbonMetadata {
   public CarbonDimension getCarbonDimensionBasedOnColIdentifier(CarbonTable carbonTable,
       String columnIdentifier) {
     List<CarbonDimension> listOfCarbonDims =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     for (CarbonDimension dimension : listOfCarbonDims) {
       if (dimension.getColumnId().equals(columnIdentifier)) {
         return dimension;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index f76ddc9..ac580cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -347,7 +347,7 @@ public class CarbonTable implements Serializable {
   /**
    * @return the tabelName
    */
-  public String getFactTableName() {
+  public String getTableName() {
     return absoluteTableIdentifier.getCarbonTableIdentifier().getTableName();
   }
 
@@ -569,7 +569,7 @@ public class CarbonTable implements Serializable {
   }
 
   public boolean isPartitionTable() {
-    return null != tablePartitionMap.get(getFactTableName());
+    return null != tablePartitionMap.get(getTableName());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 29cf62a..0b531dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -208,7 +208,7 @@ public class CarbonUpdateUtil {
       lockStatus = carbonLock.lockWithRetries();
       if (lockStatus) {
         LOGGER.info(
-                "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+                "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
                         + " for table status updation");
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
@@ -257,18 +257,18 @@ public class CarbonUpdateUtil {
         status = true;
       } else {
         LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
-                .getDatabaseName() + "." + table.getFactTableName());
+                .getDatabaseName() + "." + table.getTableName());
       }
     } finally {
       if (lockStatus) {
         if (carbonLock.unlock()) {
           LOGGER.info(
                  "Table unlocked successfully after table status updation" + table.getDatabaseName()
-                          + "." + table.getFactTableName());
+                          + "." + table.getTableName());
         } else {
           LOGGER.error(
                   "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
-                          .getFactTableName() + " during table status updation");
+                          .getTableName() + " during table status updation");
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 66dfa61..67b8681 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -122,7 +122,7 @@ public class QueryModel implements Serializable {
   public static QueryModel createModel(AbsoluteTableIdentifier absoluteTableIdentifier,
       CarbonQueryPlan queryPlan, CarbonTable carbonTable, DataTypeConverter converter) {
     QueryModel queryModel = new QueryModel();
-    String factTableName = carbonTable.getFactTableName();
+    String factTableName = carbonTable.getTableName();
     queryModel.setAbsoluteTableIdentifier(absoluteTableIdentifier);
 
     fillQueryModel(queryPlan, carbonTable, queryModel, factTableName);
@@ -141,7 +141,7 @@ public class QueryModel implements Serializable {
     if (null != queryPlan.getFilterExpression()) {
       boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
       boolean[] isFilterMeasures =
-          new boolean[carbonTable.getNumberOfMeasures(carbonTable.getFactTableName())];
+          new boolean[carbonTable.getNumberOfMeasures(carbonTable.getTableName())];
       processFilterExpression(queryPlan.getFilterExpression(),
           carbonTable.getDimensionByTableName(factTableName),
           carbonTable.getMeasureByTableName(factTableName), isFilterDimensions, isFilterMeasures);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 678a6f7..436950b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -482,6 +482,13 @@ public final class CarbonProperties {
   }
 
   /**
+   * Return the store path
+   */
+  public static String getStorePath() {
+    return getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION);
+  }
+
+  /**
    * This method will be used to get the properties value
    *
    * @param key

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
index 0de160a..5361fb0 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -94,7 +93,7 @@ public class CarbonMetadataTest {
 
   @Test public void testGetCarbonTableReturingProperTableWithProperFactTableName() {
     String expectedResult = "carbonTestTable";
-    assertEquals(expectedResult, carbonMetadata.getCarbonTable(tableUniqueName).getFactTableName());
+    assertEquals(expectedResult, carbonMetadata.getCarbonTable(tableUniqueName).getTableName());
   }
 
   @Test public void testGetCarbonTableReturingProperTableWithProperTableUniqueName() {
@@ -171,7 +170,7 @@ public class CarbonMetadataTest {
     carbonDimensions.add(new CarbonDimension(colSchema1, 1, 1, 2, 1));
     carbonDimensions.add(new CarbonDimension(colSchema2, 2, 2, 2, 2));
     new MockUp<CarbonTable>() {
-      @Mock public String getFactTableName() {
+      @Mock public String getTableName() {
         return "carbonTestTable";
       }
 
@@ -200,7 +199,7 @@ public class CarbonMetadataTest {
     colSchema2.setColumnUniqueId("2");
     carbonChildDimensions.add(new CarbonDimension(colSchema3, 1, 1, 2, 1));
     new MockUp<CarbonTable>() {
-      @Mock public String getFactTableName() {
+      @Mock public String getTableName() {
         return "carbonTestTable";
       }
 
@@ -242,7 +241,7 @@ public class CarbonMetadataTest {
     carbonChildDimensions.add(new CarbonDimension(colSchema2, 1, 1, 2, 1));
 
     new MockUp<CarbonTable>() {
-      @Mock public String getFactTableName() {
+      @Mock public String getTableName() {
         return "carbonTestTable";
       }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
index 8b66233..a47b7fd 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -57,7 +56,7 @@ public class CarbonTableTest extends TestCase {
   }
 
   @Test public void testFactTableNameReturnsProperFactTableName() {
-    assertEquals("carbonTestTable", carbonTable.getFactTableName());
+    assertEquals("carbonTestTable", carbonTable.getTableName());
   }
 
   @Test public void testTableUniqueNameIsProper() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
index e9caf4a..84312cd 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
@@ -55,7 +55,7 @@ public class CarbonTableWithComplexTypesTest extends TestCase {
   }
 
   @Test public void testFactTableNameReturnsProperFactTableName() {
-    assertEquals("carbonTestTable", carbonTable.getFactTableName());
+    assertEquals("carbonTestTable", carbonTable.getTableName());
   }
 
   @Test public void testTableUniqueNameIsProper() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
index 4b59aad..43d545d 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
@@ -89,9 +89,7 @@ object StreamExample {
              | """.stripMargin)
       }
 
-      val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
-        lookupRelation(Some("default"), streamTableName)(spark).asInstanceOf[CarbonRelation].
-        tableMeta.carbonTable
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
       val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       // batch load
       val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 0aa2974..88d8341 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -364,7 +364,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       TableProvider tableProvider = new SingleTableProvider(carbonTable);
       CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
       BitSet matchedPartitions = null;
-      PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
+      PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
       if (partitionInfo != null) {
         // prune partitions for filter query on partition table
         matchedPartitions = setMatchedPartitions(null, carbonTable, filter, partitionInfo);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 6e840e2..552455a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -393,7 +393,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     Expression filter = getFilterPredicates(job.getConfiguration());
     TableProvider tableProvider = new SingleTableProvider(carbonTable);
     // this will be null in case of corrupt schema file.
-    PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
+    PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
     CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
 
     // prune partitions for filter query on partition table
@@ -787,7 +787,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     Expression filter = getFilterPredicates(configuration);
     boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
     boolean[] isFilterMeasures =
-        new boolean[carbonTable.getNumberOfMeasures(carbonTable.getFactTableName())];
+        new boolean[carbonTable.getNumberOfMeasures(carbonTable.getTableName())];
     CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions,
         isFilterMeasures);
     queryModel.setIsFilterDimensions(isFilterDimensions);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
index a22461d..bdd7c28 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -153,13 +153,13 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
     }
     carbonTable = model.getTable();
     List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     dimensionCount = dimensions.size();
     List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     measureCount = measures.size();
     List<CarbonColumn> carbonColumnList =
-        carbonTable.getStreamStorageOrderColumn(carbonTable.getFactTableName());
+        carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
     storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
     isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
     directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
@@ -224,8 +224,8 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
   private void initializeFilter() {
 
     List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
-        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
-            carbonTable.getMeasureByTableName(carbonTable.getFactTableName()));
+        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
     int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
     for (int i = 0; i < dimLensWithComplex.length; i++) {
       dimLensWithComplex[i] = Integer.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
index 7df87e3..fdd0504 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
@@ -251,8 +251,8 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
 
   private void writeFileHeader() throws IOException {
     List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
-        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
-            carbonTable.getMeasureByTableName(carbonTable.getFactTableName()));
+        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
     int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
     for (int i = 0; i < dimLensWithComplex.length; i++) {
       dimLensWithComplex[i] = Integer.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 630828a..3afad94 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -52,7 +52,7 @@ public class CarbonInputFormatUtil {
     if (columnString != null) {
       columns = columnString.split(",");
     }
-    String factTableName = carbonTable.getFactTableName();
+    String factTableName = carbonTable.getTableName();
     CarbonQueryPlan plan = new CarbonQueryPlan(carbonTable.getDatabaseName(), factTableName);
     // fill dimensions
     // If columns are null, set all dimensions and measures
@@ -120,9 +120,9 @@ public class CarbonInputFormatUtil {
   public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable,
       boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
     List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     QueryModel.processFilterExpression(filterExpression, dimensions, measures,
         isFilterDimensions, isFilterMeasures);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index b4145ef..c45f910 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -309,9 +309,9 @@ public class StoreCreator {
     String header = reader.readLine();
     String[] split = header.split(",");
     List<CarbonColumn> allCols = new ArrayList<CarbonColumn>();
-    List<CarbonDimension> dims = table.getDimensionByTableName(table.getFactTableName());
+    List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName());
     allCols.addAll(dims);
-    List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getFactTableName());
+    List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getTableName());
     allCols.addAll(msrs);
     Set<String>[] set = new HashSet[dims.size()];
     for (int i = 0; i < set.length; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 8e6abd4..f72bb7a 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -363,7 +363,7 @@ public class CarbonTableReader {
         .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier(), null).getPath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
     config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
-    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getFactTableName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
 
     try {
       CarbonTableInputFormat.setTableInfo(config, tableInfo);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 17a4188..1430baf 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -333,10 +333,10 @@ object CarbonDataStoreCreator {
     val split: Array[String] = header.split(",")
     val allCols: util.List[CarbonColumn] = new util.ArrayList[CarbonColumn]()
     val dims: util.List[CarbonDimension] =
-      table.getDimensionByTableName(table.getFactTableName)
+      table.getDimensionByTableName(table.getTableName)
     allCols.addAll(dims)
     val msrs: List[CarbonMeasure] =
-      table.getMeasureByTableName(table.getFactTableName)
+      table.getMeasureByTableName(table.getTableName)
     allCols.addAll(msrs)
     val set: Array[util.Set[String]] = Array.ofDim[util.Set[String]](dims.size)
     for (i <- set.indices) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index 6b435c6..1d41664 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -147,7 +147,7 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
       case logicalRelation:LogicalRelation =>
         if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
           val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-          if(relation.carbonTable.getFactTableName.equalsIgnoreCase(actualTableName)) {
+          if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) {
             isValidPlan = true
           }
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
index 3f99922..df1bd2e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
@@ -51,7 +51,7 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
 
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_hashTable")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("empno"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.INT)
@@ -74,7 +74,7 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
 
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_rangeTable")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP)
@@ -101,7 +101,7 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
         |  'LIST_INFO'='0, 1, (2, 3)')
       """.stripMargin)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_listTable")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("workgroupcategory"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.STRING)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
index 317e2e2..c17ca00 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
@@ -45,7 +45,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
       """.stripMargin)
 
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_hashTable")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("empno"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.INT)
@@ -68,7 +68,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
       """.stripMargin)
 
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_rangeTable")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP)
@@ -96,7 +96,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
         |  'DICTIONARY_INCLUDE'='projectenddate')
       """.stripMargin)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_listTable")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP)
@@ -128,7 +128,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
         |  'LIST_INFO'='2017-06-11 , 2017-06-13')
       """.stripMargin)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_listTableDate")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.DATE)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
index 8eb5101..51e0cc4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
@@ -51,8 +51,8 @@ object ValidateUtil {
   def validateSortScope(carbonTable: CarbonTable, sortScope: String): Unit = {
     if (sortScope != null) {
       // Don't support use global sort on partitioned table.
-      if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null &&
-        sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) {
+      if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null &&
+          sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) {
         throw new MalformedCarbonCommandException("Don't support use global sort on partitioned " +
           "table.")
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 5c6760a..37ab8c3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -46,7 +46,7 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
     val oldPartitionIds = alterPartitionModel.oldPartitionIds
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val databaseName = carbonTable.getDatabaseName
-    val factTableName = carbonTable.getFactTableName
+    val factTableName = carbonTable.getTableName
     val partitionInfo = carbonTable.getPartitionInfo(factTableName)
 
     override protected def getPartitions: Array[Partition] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 9ca21bc..0fed5a7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -548,7 +548,7 @@ class PartitionTableDataLoaderRDD[K, V](
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
       val model: CarbonLoadModel = carbonLoadModel
       val carbonTable = model.getCarbonDataLoadSchema.getCarbonTable
-      val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+      val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
       val uniqueLoadStatusId =
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
       try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
index a82ea00..2aa5610 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
@@ -41,7 +41,7 @@ object PartitionDropper {
     val dropWithData = dropPartitionCallableModel.dropWithData
     val carbonTable = dropPartitionCallableModel.carbonTable
     val dbName = carbonTable.getDatabaseName
-    val tableName = carbonTable.getFactTableName
+    val tableName = carbonTable.getTableName
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val partitionInfo = carbonTable.getPartitionInfo(tableName)
     val partitioner = PartitionFactory.getPartitioner(partitionInfo)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
index db664b3..9106cca 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
@@ -40,7 +40,7 @@ object PartitionSplitter {
      val carbonLoadModel = splitPartitionCallableModel.carbonLoadModel
      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-     val tableName = carbonTable.getFactTableName
+     val tableName = carbonTable.getTableName
      val databaseName = carbonTable.getDatabaseName
      val bucketInfo = carbonTable.getBucketingInfo(tableName)
      var finalSplitStatus = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 1b21e3d..a3572ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -55,7 +55,6 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD
@@ -516,7 +515,6 @@ object CommonUtil {
   }
 
   def readAndUpdateLoadProgressInTableMeta(model: CarbonLoadModel,
-      storePath: String,
       insertOverwrite: Boolean): Unit = {
     val newLoadMetaEntry = new LoadMetadataDetails
     val status = if (insertOverwrite) {
@@ -528,16 +526,13 @@ object CommonUtil {
     // reading the start time of data load.
     val loadStartTime = CarbonUpdateUtil.readCurrentTime
     model.setFactTimeStamp(loadStartTime)
-    CarbonLoaderUtil
-      .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp, false)
+    CarbonLoaderUtil.populateNewLoadMetaEntry(
+      newLoadMetaEntry, status, model.getFactTimeStamp, false)
     val entryAdded: Boolean =
       CarbonLoaderUtil.recordLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite)
     if (!entryAdded) {
-      sys
-        .error(s"Failed to add entry in table status for ${ model.getDatabaseName }.${
-          model
-            .getTableName
-        }")
+      sys.error(s"Failed to add entry in table status for " +
+                s"${ model.getDatabaseName }.${model.getTableName}")
     }
   }
 
@@ -856,26 +851,9 @@ object CommonUtil {
       CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
       CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
       new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath,
-        carbonTable.getDatabaseName, carbonTable.getFactTableName).getTablePath,
+        carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath,
         segmentIds).collect()
     }
   }
 
-  /**
-   * can be removed with the spark 1.6 removal
-   * @param tableMeta
-   * @return
-   */
-  @deprecated
-  def getTablePath(tableMeta: TableMeta): String = {
-    if (tableMeta.tablePath == null) {
-      tableMeta.storePath + CarbonCommonConstants.FILE_SEPARATOR +
-      tableMeta.carbonTableIdentifier.getDatabaseName +
-      CarbonCommonConstants.FILE_SEPARATOR + tableMeta.carbonTableIdentifier.getTableName
-    }
-    else {
-      tableMeta.tablePath
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 35e1e78..84ad85e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -180,10 +180,10 @@ object DataLoadingUtil {
       options: immutable.Map[String, String],
       optionsFinal: mutable.Map[String, String],
       carbonLoadModel: CarbonLoadModel): Unit = {
-    carbonLoadModel.setTableName(table.getFactTableName)
+    carbonLoadModel.setTableName(table.getTableName)
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
     carbonLoadModel.setTablePath(table.getTablePath)
-    carbonLoadModel.setTableName(table.getFactTableName)
+    carbonLoadModel.setTableName(table.getTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
     // Need to fill dimension relation
     carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
@@ -199,7 +199,7 @@ object DataLoadingUtil {
     val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
     val all_dictionary_path = optionsFinal("all_dictionary_path")
     val column_dict = optionsFinal("columndict")
-    ValidateUtil.validateDateFormat(dateFormat, table, table.getFactTableName)
+    ValidateUtil.validateDateFormat(dateFormat, table, table.getTableName)
     ValidateUtil.validateSortScope(table, sort_scope)
 
     if (bad_records_logger_enable.toBoolean ||
@@ -236,7 +236,7 @@ object DataLoadingUtil {
         }
       } else {
         if (fileHeader.isEmpty) {
-          fileHeader = table.getCreateOrderColumn(table.getFactTableName)
+          fileHeader = table.getCreateOrderColumn(table.getTableName)
             .asScala.map(_.getColName).mkString(",")
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 975fc9b..0bf2b16 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -676,17 +676,17 @@ object GlobalDictionaryUtil {
    */
   def generateGlobalDictionary(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      storePath: String,
+      tablePath: String,
       dataFrame: Option[DataFrame] = None): Unit = {
     try {
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
       val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
       // create dictionary folder if not exists
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
       val dictfolderPath = carbonTablePath.getMetadataDirectoryPath
       // columns which need to generate global dictionary file
       val dimensions = carbonTable.getDimensionByTableName(
-        carbonTable.getFactTableName).asScala.toArray
+        carbonTable.getTableName).asScala.toArray
       // generate global dict from pre defined column dict file
       carbonLoadModel.initPredefDictMap()
 
@@ -701,7 +701,7 @@ object GlobalDictionaryUtil {
         if (colDictFilePath != null) {
           // generate predefined dictionary
           generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
-            dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath)
+            dimensions, carbonLoadModel, sqlContext, tablePath, dictfolderPath)
         }
         if (headers.length > df.columns.length) {
           val msg = "The number of columns in the file header do not match the " +
@@ -717,7 +717,7 @@ object GlobalDictionaryUtil {
           // select column to push down pruning
           df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
           val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
-            requireDimension, storePath, dictfolderPath, false)
+            requireDimension, tablePath, dictfolderPath, false)
           // combine distinct value in a block and partition by column
           val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
             .partitionBy(new ColumnPartitioner(model.primDimensions.length))
@@ -731,7 +731,7 @@ object GlobalDictionaryUtil {
       } else {
         generateDictionaryFromDictionaryFiles(sqlContext,
           carbonLoadModel,
-          storePath,
+          tablePath,
           carbonTableIdentifier,
           dictfolderPath,
           dimensions,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 756de6b..2f6b277 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -454,7 +454,7 @@ class TableNewProcessor(cm: TableModel) {
       val field = cm.dimCols.find(keyDim equals _.column).get
       val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) {
         cm.parentTable.get.getColumnByName(
-          cm.parentTable.get.getFactTableName,
+          cm.parentTable.get.getTableName,
           cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName).getEncoder
       } else {
         val encoders = new java.util.ArrayList[Encoding]()
@@ -479,7 +479,7 @@ class TableNewProcessor(cm: TableModel) {
         val encoders = if (cm.parentTable.isDefined &&
                            cm.dataMapRelation.get.get(field).isDefined) {
           cm.parentTable.get.getColumnByName(
-            cm.parentTable.get.getFactTableName,
+            cm.parentTable.get.getTableName,
             cm.dataMapRelation.get.get(field).get.
               columnTableRelation.get.parentColumnName).getEncoder
         } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1ca7456..c12d2ef 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -179,7 +179,7 @@ object CarbonDataRDDFactory {
             while (null != tableForCompaction) {
               LOGGER.info("Compaction request has been identified for table " +
                   s"${ tableForCompaction.getDatabaseName }." +
-                  s"${ tableForCompaction.getFactTableName}")
+                  s"${ tableForCompaction.getTableName}")
               val table: CarbonTable = tableForCompaction
               val metadataPath = table.getMetaDataFilepath
               val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
@@ -204,7 +204,7 @@ object CarbonDataRDDFactory {
                 case e: Exception =>
                   LOGGER.error("Exception in compaction thread for table " +
                       s"${ tableForCompaction.getDatabaseName }." +
-                      s"${ tableForCompaction.getFactTableName }")
+                      s"${ tableForCompaction.getTableName }")
                 // not handling the exception. only logging as this is not the table triggered
                 // by user.
               } finally {
@@ -216,7 +216,7 @@ object CarbonDataRDDFactory {
                   skipCompactionTables.+=:(tableForCompaction.getCarbonTableIdentifier)
                   LOGGER.error("Compaction request file can not be deleted for table " +
                       s"${ tableForCompaction.getDatabaseName }." +
-                      s"${ tableForCompaction.getFactTableName }")
+                      s"${ tableForCompaction.getTableName }")
                 }
               }
               // ********* check again for all the tables.
@@ -248,7 +248,7 @@ object CarbonDataRDDFactory {
       table: CarbonTable
   ): CarbonLoadModel = {
     val loadModel = new CarbonLoadModel
-    loadModel.setTableName(table.getFactTableName)
+    loadModel.setTableName(table.getTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
     // Need to fill dimension relation
     loadModel.setCarbonDataLoadSchema(dataLoadSchema)
@@ -319,7 +319,7 @@ object CarbonDataRDDFactory {
           }
         }
       } else {
-        status = if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
+        status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
           loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel)
         } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
           DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
@@ -782,7 +782,7 @@ object CarbonDataRDDFactory {
       dataFrame: Option[DataFrame],
       carbonLoadModel: CarbonLoadModel): RDD[Row] = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionColumn = partitionInfo.getColumnSchemaList.get(0).getColumnName
     val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
     val columns = carbonLoadModel.getCsvHeaderColumns

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 1e6a36e..47f5344 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -21,23 +21,20 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.hive.{CarbonMetaData, CarbonRelation, DictionaryMap}
 
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.processing.merger.TableMeta
 
 case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
 
 object CarbonSparkUtil {
 
   def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
-    val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
+    val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getTableName)
         .asScala.map(x => x.getColName) // wf : may be problem
-    val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
+    val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getTableName)
         .asScala.map(x => x.getColName)
     val dictionary =
-      carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
+      carbonTable.getDimensionByTableName(carbonTable.getTableName).asScala.map { f =>
         (f.getColName.toLowerCase,
             f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
                 !f.getDataType.isComplexType)
@@ -47,10 +44,11 @@ object CarbonSparkUtil {
 
   def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = {
     val table = CarbonTable.buildFromTableInfo(tableInfo)
-    val meta = new TableMeta(table.getCarbonTableIdentifier,
-      table.getTablePath, tablePath, table)
-    CarbonRelation(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName,
-      CarbonSparkUtil.createSparkMeta(table), meta)
+    CarbonRelation(
+      tableInfo.getDatabaseName,
+      tableInfo.getFactTable.getTableName,
+      CarbonSparkUtil.createSparkMeta(table),
+      table)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 44fbb37..b74576d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.types._
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonOption
 
 class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
@@ -58,7 +59,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
    */
   private def loadTempCSV(options: CarbonOption): Unit = {
     // temporary solution: write to csv file, then load the csv into carbon
-    val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).storePath
+    val storePath = CarbonProperties.getStorePath
     val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
       .append("tempCSV")
       .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 22933f2..72f40ac 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -50,7 +50,7 @@ case class CarbonDatasourceHadoopRelation(
   lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(paths.head,
     parameters("dbname"), parameters("tablename"))
   lazy val databaseName: String = carbonTable.getDatabaseName
-  lazy val tableName: String = carbonTable.getFactTableName
+  lazy val tableName: String = carbonTable.getTableName
   CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
 
   @transient lazy val carbonRelation: CarbonRelation =
@@ -58,7 +58,7 @@ case class CarbonDatasourceHadoopRelation(
     createCarbonRelation(parameters, identifier, sparkSession)
 
 
-  @transient lazy val carbonTable: CarbonTable = carbonRelation.tableMeta.carbonTable
+  @transient lazy val carbonTable: CarbonTable = carbonRelation.carbonTable
 
   override def sqlContext: SQLContext = sparkSession.sqlContext
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index c7db436..9d88c4c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -72,7 +72,7 @@ case class CarbonDictionaryDecoder(
     attachTree(this, "execute") {
       val absoluteTableIdentifiers = relations.map { relation =>
         val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
-        (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
+        (carbonTable.getTableName, carbonTable.getAbsoluteTableIdentifier)
       }.toMap
 
       if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
@@ -125,7 +125,7 @@ case class CarbonDictionaryDecoder(
 
     val absoluteTableIdentifiers = relations.map { relation =>
       val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
-      (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
+      (carbonTable.getTableName, carbonTable.getAbsoluteTableIdentifier)
     }.toMap
 
     if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
@@ -323,7 +323,7 @@ object CarbonDictionaryDecoder {
       if (relation.isDefined && canBeDecoded(attr, profile)) {
         val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension = carbonTable
-          .getDimensionByName(carbonTable.getFactTableName, attr.name)
+          .getDimensionByName(carbonTable.getTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
@@ -355,7 +355,7 @@ object CarbonDictionaryDecoder {
       if (relation.isDefined) {
         val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension = carbonTable
-          .getDimensionByName(carbonTable.getFactTableName, attr.name)
+          .getDimensionByName(carbonTable.getTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
@@ -432,12 +432,12 @@ object CarbonDictionaryDecoder {
       if (relation.isDefined && CarbonDictionaryDecoder.canBeDecoded(attr, profile)) {
         val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension =
-          carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+          carbonTable.getDimensionByName(carbonTable.getTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
             !carbonDimension.isComplex) {
-          (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
+          (carbonTable.getTableName, carbonDimension.getColumnIdentifier,
             carbonDimension)
         } else {
           (null, null, null)
@@ -485,12 +485,12 @@ class CarbonDecoderRDD(
       if (relation.isDefined && canBeDecoded(attr)) {
         val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension =
-          carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+          carbonTable.getDimensionByName(carbonTable.getTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
             !carbonDimension.isComplex()) {
-          (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
+          (carbonTable.getTableName, carbonDimension.getColumnIdentifier,
             carbonDimension)
         } else {
           (null, null, null)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 1ee7650..dcfce0f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -20,13 +20,15 @@ package org.apache.spark.sql
 import java.util.Map
 import java.util.concurrent.ConcurrentHashMap
 
-import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, CarbonSessionCatalog}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.sql.internal.CarbonSQLConf
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
-import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationListenerBus}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -41,8 +43,6 @@ class CarbonEnv {
 
   var carbonSessionInfo: CarbonSessionInfo = _
 
-  var storePath: String = _
-
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   // set readsupport class global so that the executor can get it.
@@ -74,7 +74,7 @@ class CarbonEnv {
         config.addDefaultCarbonSessionParams()
         carbonMetastore = {
           val properties = CarbonProperties.getInstance()
-          storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
+          var storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
           if (storePath == null) {
             storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
             properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
@@ -112,6 +112,30 @@ object CarbonEnv {
       carbonEnv
     }
   }
-}
 
+  /**
+   * Return carbon table instance by looking up relation in `sparkSession`
+   */
+  def getCarbonTable(
+      databaseNameOp: Option[String],
+      tableName: String)
+    (sparkSession: SparkSession): CarbonTable = {
+    CarbonEnv
+      .getInstance(sparkSession)
+      .carbonMetastore
+      .lookupRelation(databaseNameOp, tableName)(sparkSession)
+      .asInstanceOf[CarbonRelation]
+      .carbonTable
+  }
 
+  def getCarbonTable(
+      tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession): CarbonTable = {
+    CarbonEnv
+      .getInstance(sparkSession)
+      .carbonMetastore
+      .lookupRelation(tableIdentifier)(sparkSession)
+      .asInstanceOf[CarbonRelation]
+      .carbonTable
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 0806421..99a7c37 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -65,11 +65,11 @@ case class CarbonScan(
       attributesRaw = attributeOut
     }
 
-    val columns = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName)
+    val columns = carbonTable.getCreateOrderColumn(carbonTable.getTableName)
     val colAttr = new Array[Attribute](columns.size())
     attributesRaw.foreach { attr =>
     val column =
-        carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
+        carbonTable.getColumnByName(carbonTable.getTableName, attr.name)
       if(column != null) {
         colAttr(columns.indexOf(column)) = attr
        }
@@ -78,7 +78,7 @@ case class CarbonScan(
 
     var queryOrder: Integer = 0
     attributesRaw.foreach { attr =>
-      val carbonColumn = carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
+      val carbonColumn = carbonTable.getColumnByName(carbonTable.getTableName, attr.name)
       if (carbonColumn != null) {
         if (carbonColumn.isDimension()) {
           val dim = new QueryDimension(attr.name)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index fba590e..6331f12 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -165,7 +165,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       } else {
         CarbonEnv.getInstance(sparkSession).carbonMetastore
           .lookupRelation(Option(dbName), tableName)(sparkSession)
-        (CarbonEnv.getInstance(sparkSession).storePath + s"/$dbName/$tableName", parameters)
+        (CarbonProperties.getStorePath + s"/$dbName/$tableName", parameters)
       }
     } catch {
       case ex: NoSuchTableException =>
@@ -199,11 +199,10 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       if (parameters.contains("tablePath")) {
         (parameters("tablePath"), parameters)
       } else if (!sparkSession.isInstanceOf[CarbonSession]) {
-        (CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName, parameters)
+        (CarbonProperties.getStorePath + "/" + dbName + "/" + tableName, parameters)
       } else {
-        val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        (relation.tableMeta.tablePath, parameters)
+        val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+        (carbonTable.getTablePath, parameters)
       }
     } catch {
       case ex: Exception =>
@@ -235,15 +234,11 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
     }
     if (tablePathOption.isDefined) {
       val sparkSession = sqlContext.sparkSession
-      val identifier: AbsoluteTableIdentifier =
-        AbsoluteTableIdentifier.from(tablePathOption.get, dbName, tableName)
-      val carbonTable =
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.
-          createCarbonRelation(parameters, identifier, sparkSession).tableMeta.carbonTable
+      val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
 
       if (!carbonTable.isStreamingTable) {
         throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
-                                        s"${carbonTable.getFactTableName} is not a streaming table")
+                                        s"${carbonTable.getTableName} is not a streaming table")
       }
 
       // create sink
@@ -314,8 +309,7 @@ object CarbonSource {
     val tableName: String = properties.getOrElse("tableName", "").toLowerCase
     val model = createTableInfoFromParams(properties, dataSchema, dbName, tableName)
     val tableInfo: TableInfo = TableNewProcessor(model)
-    val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonEnv.getInstance(sparkSession).storePath)
+    val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession, CarbonProperties.getStorePath)
     val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
     val schemaEvolutionEntry = new SchemaEvolutionEntry
     schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
index 197b23b..f83766d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
@@ -25,7 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 
 case class CarbonCreateTableCommand(
     cm: TableModel,
@@ -37,7 +37,7 @@ case class CarbonCreateTableCommand(
   }
 
   override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    val storePath = CarbonEnv.getInstance(sparkSession).storePath
+    val storePath = CarbonProperties.getStorePath
     CarbonEnv.getInstance(sparkSession).carbonMetastore.
       checkSchemasModifiedTimeAndReloadTables()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
index 7dcad9a..b233c99 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
@@ -29,6 +29,7 @@ import org.codehaus.jackson.map.ObjectMapper
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.util.CarbonProperties
 
 private[sql] case class CarbonDescribeFormattedCommand(
     child: SparkPlan,
@@ -68,7 +69,7 @@ private[sql] case class CarbonDescribeFormattedCommand(
       val colComment = field.getComment().getOrElse("null")
       val comment = if (dims.contains(fieldName)) {
         val dimension = relation.metaData.carbonTable.getDimensionByName(
-          relation.tableMeta.carbonTableIdentifier.getTableName, fieldName)
+          relation.carbonTable.getTableName, fieldName)
         if (null != dimension.getColumnProperties && !dimension.getColumnProperties.isEmpty) {
           colProps.append(fieldName).append(".")
             .append(mapper.writeValueAsString(dimension.getColumnProperties))
@@ -101,12 +102,11 @@ private[sql] case class CarbonDescribeFormattedCommand(
       colProps.toString()
     }
     results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
-    results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
-      .getDatabaseName, "")
+    results ++= Seq(("Database Name: ", relation.carbonTable.getDatabaseName, "")
     )
-    results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
-    results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
-    val carbonTable = relation.tableMeta.carbonTable
+    results ++= Seq(("Table Name: ", relation.carbonTable.getTableName, ""))
+    results ++= Seq(("CARBON Store Path: ", CarbonProperties.getStorePath, ""))
+    val carbonTable = relation.carbonTable
     // Carbon table support table comment
     val tableComment = carbonTable.getTableInfo.getFactTable.getTableProperties
       .getOrDefault(CarbonCommonConstants.TABLE_COMMENT, "")
@@ -122,14 +122,14 @@ private[sql] case class CarbonDescribeFormattedCommand(
       results ++= Seq(("ADAPTIVE", "", ""))
     }
     results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns(
-      relation.tableMeta.carbonTableIdentifier.getTableName).asScala
+      relation.carbonTable.getTableName).asScala
       .map(column => column).mkString(","), ""))
     val dimension = carbonTable
-      .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+      .getDimensionByTableName(relation.carbonTable.getTableName)
     results ++= getColumnGroups(dimension.asScala.toList)
-    if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
+    if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
       results ++=
-      Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+      Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getTableName)
         .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), ""))
     }
     results.map {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
index 0343393..f0a916a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
@@ -30,7 +30,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events._
 
@@ -50,12 +50,11 @@ case class CarbonDropTableCommand(
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
     val identifier = TableIdentifier(tableName, Option(dbName))
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
     val carbonEnv = CarbonEnv.getInstance(sparkSession)
     val catalog = carbonEnv.carbonMetastore
     val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonEnv.getInstance(sparkSession).storePath)
+      CarbonProperties.getStorePath)
     val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
     val absoluteTableIdentifier = AbsoluteTableIdentifier
       .from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
@@ -68,7 +67,7 @@ case class CarbonDropTableCommand(
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
       val carbonTable: Option[CarbonTable] =
         catalog.getTableFromMetadataCache(dbName, tableName) match {
-          case Some(tableMeta) => Some(tableMeta.carbonTable)
+          case Some(carbonTable) => Some(carbonTable)
           case None => try {
             Some(catalog.lookupRelation(identifier)(sparkSession)
               .asInstanceOf[CarbonRelation].metaData.carbonTable)
@@ -131,7 +130,7 @@ case class CarbonDropTableCommand(
     // delete the table folder
     val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
     val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonEnv.getInstance(sparkSession).storePath)
+      CarbonProperties.getStorePath)
     val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
     val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
     val metadataFilePath =