You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/18 15:29:43 UTC

[09/28] carbondata git commit: [CARBONDATA-1739] Clean up store path interface

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 153b169..07491d1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -65,7 +65,7 @@ object AlterTableUtil {
       sys.error(s"Table $dbName.$tableName does not exist")
     }
     // acquire the lock first
-    val table = relation.tableMeta.carbonTable
+    val table = relation.carbonTable
     val acquiredLocks = ListBuffer[ICarbonLock]()
     try {
       locksToBeAcquired.foreach { lock =>
@@ -133,7 +133,7 @@ object AlterTableUtil {
       thriftTable: TableInfo)(sparkSession: SparkSession,
       sessionState: CarbonSessionState): Unit = {
     val dbName = carbonTable.getDatabaseName
-    val tableName = carbonTable.getFactTableName
+    val tableName = carbonTable.getTableName
     CarbonEnv.getInstance(sparkSession).carbonMetastore
       .updateTableSchemaForAlter(carbonTable.getCarbonTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
@@ -232,10 +232,7 @@ object AlterTableUtil {
   def revertAddColumnChanges(dbName: String, tableName: String, timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val carbonTable = metastore
-      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
-      .carbonTable
-
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
       carbonTable.getCarbonTableIdentifier)
     val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -262,9 +259,7 @@ object AlterTableUtil {
   def revertDropColumnChanges(dbName: String, tableName: String, timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val carbonTable = metastore
-      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
-      .carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
       carbonTable.getCarbonTableIdentifier)
     val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -297,9 +292,7 @@ object AlterTableUtil {
   def revertDataTypeChanges(dbName: String, tableName: String, timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val carbonTable = metastore
-      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
-      .carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
       carbonTable.getCarbonTableIdentifier)
     val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -343,30 +336,27 @@ object AlterTableUtil {
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
     var locks = List.empty[ICarbonLock]
     var timeStamp = 0L
-    var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
     var carbonTable: CarbonTable = null
     try {
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
+      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       // get the latest carbon table
       // read the latest schema file
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
         carbonTable.getCarbonTableIdentifier)
       val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
-      val wrapperTableInfo = schemaConverter
-        .fromExternalToWrapperTableInfo(thriftTableInfo,
-          dbName,
-          tableName,
-          carbonTable.getTablePath)
+      val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+        thriftTableInfo,
+        dbName,
+        tableName,
+        carbonTable.getTablePath)
       val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
       schemaEvolutionEntry.setTimeStamp(timeStamp)
-      val thriftTable = schemaConverter
-        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+      val thriftTable = schemaConverter.fromWrapperToExternalTableInfo(
+        wrapperTableInfo, dbName, tableName)
       val tblPropertiesMap: mutable.Map[String, String] =
         thriftTable.fact_table.getTableProperties.asScala
       if (set) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index dcfbaea..c05c0f1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.api.CarbonStore
 
@@ -40,9 +39,7 @@ object CleanFiles {
   def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
       storePath: String, forceTableClean: Boolean = false): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
-      lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
     CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 8375762..d682b21 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -17,7 +17,6 @@
  package org.apache.spark.util
 
  import org.apache.spark.sql.{CarbonEnv, SparkSession}
- import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.api.CarbonStore
 
@@ -30,9 +29,7 @@ object DeleteSegmentByDate {
   def deleteSegmentByDate(spark: SparkSession, dbName: String, tableName: String,
       dateValue: String): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
-      lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
     CarbonStore.deleteLoadByDate(dateValue, dbName, tableName, carbonTable)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index 9b87504..5b58c8d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -34,9 +34,7 @@ object DeleteSegmentById {
   def deleteSegmentById(spark: SparkSession, dbName: String, tableName: String,
       segmentIds: Seq[String]): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
-      lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
     CarbonStore.deleteLoadById(segmentIds, dbName, tableName, carbonTable)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 23cba20..287191c 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -245,7 +245,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   test("Alter table add partition: List Partition") {
     sql("""ALTER TABLE list_table_area ADD PARTITION ('OutSpace', 'Hi')""".stripMargin)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val list_info = partitionInfo.getListInfo
     assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -286,7 +286,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     
     sql("""ALTER TABLE list_table_area DROP PARTITION(2) WITH DATA""")
     val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
-    val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getTableName)
     val partitionIds2 = partitionInfo2.getPartitionIds
     val list_info2 = partitionInfo2.getListInfo
     assert(partitionIds2 == List(0, 1, 3, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -304,7 +304,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   test("Alter table add partition: Range Partition") {
     sql("""ALTER TABLE range_table_logdate ADD PARTITION ('2017/01/01', '2018/01/01')""")
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val range_info = partitionInfo.getRangeInfo
     assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -342,7 +342,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
     sql("""ALTER TABLE range_table_logdate DROP PARTITION(3) WITH DATA;""")
     val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
-    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val range_info1 = partitionInfo1.getRangeInfo
     assert(partitionIds1 == List(0, 1, 2, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -373,7 +373,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   test("Alter table split partition: List Partition") {
     sql("""ALTER TABLE list_table_country SPLIT PARTITION(4) INTO ('Canada', 'Russia', '(Good, NotGood)')""".stripMargin)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val list_info = partitionInfo.getListInfo
     assert(partitionIds == List(0, 1, 2, 3, 6, 7, 8, 5).map(Integer.valueOf(_)).asJava)
@@ -415,7 +415,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
     sql("""ALTER TABLE list_table_country DROP PARTITION(8)""")
     val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
-    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val list_info1 = partitionInfo1.getListInfo
     assert(partitionIds1 == List(0, 1, 2, 3, 6, 7, 5).map(Integer.valueOf(_)).asJava)
@@ -438,7 +438,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     sql("""ALTER TABLE list_table_country ADD PARTITION ('(Part1, Part2, Part3, Part4)')""".stripMargin)
     sql("""ALTER TABLE list_table_country SPLIT PARTITION(9) INTO ('Part4', 'Part2', '(Part1, Part3)')""".stripMargin)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val list_info = partitionInfo.getListInfo
     assert(partitionIds == List(0, 1, 2, 3, 6, 7, 5, 10, 11, 12).map(Integer.valueOf(_)).asJava)
@@ -485,7 +485,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     sql("""ALTER TABLE list_table_area ADD PARTITION ('(One,Two, Three, Four)')""".stripMargin)
     sql("""ALTER TABLE list_table_area SPLIT PARTITION(6) INTO ('One', '(Two, Three )', 'Four')""".stripMargin)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val list_info = partitionInfo.getListInfo
     assert(partitionIds == List(0, 1, 3, 4, 5, 7, 8, 9).map(Integer.valueOf(_)).asJava)
@@ -528,7 +528,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   test("Alter table split partition: Range Partition") {
     sql("""ALTER TABLE range_table_logdate_split SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""")
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val rangeInfo = partitionInfo.getRangeInfo
     assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava)
@@ -566,7 +566,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
     sql("""ALTER TABLE range_table_logdate_split DROP PARTITION(6)""")
     val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
-    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val rangeInfo1 = partitionInfo1.getRangeInfo
     assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
@@ -586,7 +586,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   test("Alter table split partition: Range Partition + Bucket") {
     sql("""ALTER TABLE range_table_bucket SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""")
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val rangeInfo = partitionInfo.getRangeInfo
     assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava)
@@ -624,7 +624,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
     sql("""ALTER TABLE range_table_bucket DROP PARTITION(6) WITH DATA""")
     val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
-    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val rangeInfo1 = partitionInfo1.getRangeInfo
     assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
@@ -642,7 +642,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
     sql("""ALTER TABLE range_table_bucket DROP PARTITION(3)""")
     val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
-    val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getTableName)
     val partitionIds2 = partitionInfo2.getPartitionIds
     val rangeInfo2 = partitionInfo2.getRangeInfo
     assert(partitionIds2 == List(0, 1, 2, 5).map(Integer.valueOf(_)).asJava)
@@ -659,7 +659,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
     sql("""ALTER TABLE range_table_bucket DROP PARTITION(5)""")
     val carbonTable3 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
-    val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getTableName)
     val partitionIds3 = partitionInfo3.getPartitionIds
     val rangeInfo3 = partitionInfo3.getRangeInfo
     assert(partitionIds3 == List(0, 1, 2).map(Integer.valueOf(_)).asJava)
@@ -789,7 +789,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     sql("ALTER TABLE carbon_table_default_db ADD PARTITION ('2017')")
 
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_carbon_table_default_db")
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val range_info = partitionInfo.getRangeInfo
     assert(partitionIds == List(0, 1, 2, 3).map(Integer.valueOf(_)).asJava)
@@ -809,7 +809,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     sql("ALTER TABLE carbondb.carbontable ADD PARTITION ('2017')")
 
     val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("carbondb_carbontable")
-    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable1.getFactTableName)
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable1.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val range_info1 = partitionInfo1.getRangeInfo
     assert(partitionIds1 == List(0, 1, 2, 3).map(Integer.valueOf(_)).asJava)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index a3024be..3f5d8c6 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -43,12 +43,12 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     header: String,
     allDictFilePath: String): CarbonLoadModel = {
     val carbonLoadModel = new CarbonLoadModel
-    carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-    carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getTableName)
-    val table = relation.tableMeta.carbonTable
+    carbonLoadModel.setTableName(relation.carbonTable.getDatabaseName)
+    carbonLoadModel.setDatabaseName(relation.carbonTable.getTableName)
+    val table = relation.carbonTable
     val carbonSchema = new CarbonDataLoadSchema(table)
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
-    carbonLoadModel.setTableName(table.getFactTableName)
+    carbonLoadModel.setTableName(table.getTableName)
     carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
     carbonLoadModel.setFactFilePath(filePath)
     carbonLoadModel.setCsvHeader(header)
@@ -141,10 +141,8 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
   test("Support generate global dictionary from all dictionary files") {
     val header = "id,name,city,age"
     val carbonLoadModel = buildCarbonLoadModel(sampleRelation, null, header, sampleAllDictionaryFile)
-    GlobalDictionaryUtil
-      .generateGlobalDictionary(sqlContext,
-        carbonLoadModel,
-        sampleRelation.tableMeta.storePath)
+    GlobalDictionaryUtil.generateGlobalDictionary(
+      sqlContext, carbonLoadModel, sampleRelation.carbonTable.getTablePath)
 
     DictionaryTestCaseUtil.
       checkDictionary(sampleRelation, "city", "shenzhen")
@@ -156,7 +154,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     GlobalDictionaryUtil
       .generateGlobalDictionary(sqlContext,
       carbonLoadModel,
-      complexRelation.tableMeta.storePath)
+      complexRelation.carbonTable.getTablePath)
 
     DictionaryTestCaseUtil.
       checkDictionary(complexRelation, "channelsId", "1650")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index 930de43..4551120 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -38,9 +38,9 @@ object DictionaryTestCaseUtil {
    * @param value  a value of column
    */
   def checkDictionary(relation: CarbonRelation, columnName: String, value: String) {
-    val table = relation.tableMeta.carbonTable
-    val dimension = table.getDimensionByName(table.getFactTableName, columnName)
-    val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getFactTableName, "uniqueid")
+    val table = relation.carbonTable
+    val dimension = table.getDimensionByName(table.getTableName, columnName)
+    val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getTableName, "uniqueid")
     val  absoluteTableIdentifier = new AbsoluteTableIdentifier(table.getTablePath, tableIdentifier)
     val columnIdentifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
       dimension.getColumnIdentifier, dimension.getDataType,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index d37a68b..78ae384 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -151,12 +151,12 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
       extColFilePath: String,
       csvDelimiter: String = ","): CarbonLoadModel = {
     val carbonLoadModel = new CarbonLoadModel
-    carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-    carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getTableName)
-    val table = relation.tableMeta.carbonTable
+    carbonLoadModel.setTableName(relation.carbonTable.getDatabaseName)
+    carbonLoadModel.setDatabaseName(relation.carbonTable.getTableName)
+    val table = relation.carbonTable
     val carbonSchema = new CarbonDataLoadSchema(table)
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
-    carbonLoadModel.setTableName(table.getFactTableName)
+    carbonLoadModel.setTableName(table.getTableName)
     carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
     carbonLoadModel.setFactFilePath(filePath)
     carbonLoadModel.setCsvHeader(header)
@@ -198,7 +198,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
       header, extColDictFilePath1)
     GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
-      extComplexRelation.tableMeta.storePath)
+      extComplexRelation.carbonTable.getTablePath)
     // check whether the dictionary is generated
     DictionaryTestCaseUtil.checkDictionary(
       extComplexRelation, "deviceInformationId", "10086")
@@ -207,7 +207,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
       header, extColDictFilePath2)
     GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
-      extComplexRelation.tableMeta.storePath)
+      extComplexRelation.carbonTable.getTablePath)
     // check the old dictionary and whether the new distinct value is generated
     DictionaryTestCaseUtil.checkDictionary(
       extComplexRelation, "deviceInformationId", "10086")
@@ -220,7 +220,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
       header, extColDictFilePath3)
     GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
-      extComplexRelation.tableMeta.storePath)
+      extComplexRelation.carbonTable.getTablePath)
     // check whether the dictionary is generated
     DictionaryTestCaseUtil.checkDictionary(
       extComplexRelation, "channelsId", "1421|")
@@ -229,7 +229,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     carbonLoadModel = buildCarbonLoadModel(verticalDelimiteRelation, complexFilePath2,
       header2, extColDictFilePath3, "|")
     GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
-      verticalDelimiteRelation.tableMeta.storePath)
+      verticalDelimiteRelation.carbonTable.getTablePath)
     // check whether the dictionary is generated
     DictionaryTestCaseUtil.checkDictionary(
       verticalDelimiteRelation, "channelsId", "1431,")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 442d93e..71c3dc2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -182,9 +182,9 @@ public final class DataLoadProcessBuilder {
         loadModel.getBadRecordsLocation());
     CarbonMetadata.getInstance().addCarbonTable(carbonTable);
     List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     Map<String, String> dateFormatMap =
         CarbonDataProcessorUtil.getDateFormatMap(loadModel.getDateFormat());
     List<DataField> dataFields = new ArrayList<>();
@@ -209,7 +209,7 @@ public final class DataLoadProcessBuilder {
       }
     }
     configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
-    configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName()));
+    configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getTableName()));
     // configuration for one pass load: dictionary server info
     configuration.setUseOnePass(loadModel.getUseOnePass());
     configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index be3572c..65f70a0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -209,7 +209,7 @@ public class CarbonCompactionExecutor {
     List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
     List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     for (CarbonDimension dim : dimensions) {
       // check if dimension is deleted
       QueryDimension queryDimension = new QueryDimension(dim.getColName());
@@ -220,7 +220,7 @@ public class CarbonCompactionExecutor {
 
     List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     for (CarbonMeasure carbonMeasure : measures) {
       // check if measure is deleted
       QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 08b8600..c60bb24 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -308,7 +308,7 @@ public class CarbonCompactionUtil {
   public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> columnCardinalityMap,
       CarbonTable carbonTable, List<ColumnSchema> updatedColumnSchemaList) {
     List<CarbonDimension> masterDimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     List<Integer> updatedCardinalityList = new ArrayList<>(columnCardinalityMap.size());
     for (CarbonDimension dimension : masterDimensions) {
       Integer value = columnCardinalityMap.get(dimension.getColumnId());
@@ -321,7 +321,7 @@ public class CarbonCompactionUtil {
     }
     // add measures to the column schema list
     List<CarbonMeasure> masterSchemaMeasures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     for (CarbonMeasure measure : masterSchemaMeasures) {
       updatedColumnSchemaList.add(measure.getColumnSchema());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index c1df349..8f6d19c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1264,7 +1264,7 @@ public final class CarbonDataMergerUtil {
       lockStatus = carbonLock.lockWithRetries();
       if (lockStatus) {
         LOGGER.info(
-                "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+                "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
                         + " for table status updation");
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
@@ -1284,18 +1284,18 @@ public final class CarbonDataMergerUtil {
         }
       } else {
         LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
-                .getDatabaseName() + "." + table.getFactTableName());
+                .getDatabaseName() + "." + table.getTableName());
       }
     } finally {
       if (lockStatus) {
         if (carbonLock.unlock()) {
           LOGGER.info(
                  "Table unlocked successfully after table status updation" + table.getDatabaseName()
-                          + "." + table.getFactTableName());
+                          + "." + table.getTableName());
         } else {
           LOGGER.error(
                   "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
-                          .getFactTableName() + " during table status updation");
+                          .getTableName() + " during table status updation");
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java b/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
deleted file mode 100644
index 09dbfff..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.merger;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-
-public class TableMeta implements Serializable {
-
-  private static final long serialVersionUID = -1749874611119829431L;
-
-  public CarbonTableIdentifier carbonTableIdentifier;
-  public String storePath;
-  public CarbonTable carbonTable;
-  public String tablePath;
-
-  public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath, String tablePath,
-      CarbonTable carbonTable) {
-    this.carbonTableIdentifier = carbonTableIdentifier;
-    this.storePath = storePath;
-    this.tablePath = tablePath;
-    this.carbonTable = carbonTable;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
index aeddac6..36e022b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
@@ -78,7 +78,7 @@ public abstract class AbstractCarbonQueryExecutor {
     List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
     List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     for (CarbonDimension dim : dimensions) {
       // check if dimension is deleted
       QueryDimension queryDimension = new QueryDimension(dim.getColName());
@@ -89,7 +89,7 @@ public abstract class AbstractCarbonQueryExecutor {
 
     List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     for (CarbonMeasure carbonMeasure : measures) {
       // check if measure is deleted
       QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 1db414f..48c5471 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -46,7 +46,7 @@ public class RowResultProcessor {
       SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
     CarbonDataProcessorUtil.createLocations(tempStoreLocation);
     this.segmentProperties = segProp;
-    String tableName = carbonTable.getFactTableName();
+    String tableName = carbonTable.getTableName();
     CarbonFactDataHandlerModel carbonFactDataHandlerModel =
         CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
             segProp, tableName, tempStoreLocation);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 504e7ec..75fcea3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.processing.store;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -42,19 +41,15 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
 import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthEquiSplitGenerator;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
-import org.apache.carbondata.processing.store.file.FileManager;
-import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
 
@@ -77,10 +72,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private CarbonFactDataWriter dataWriter;
 
   /**
-   * File manager
-   */
-  private IFileManagerComposite fileManager;
-  /**
    * total number of entries in blocklet
    */
   private int entryCount;
@@ -91,11 +82,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private int pageSize;
 
-  // This variable is true if it is dictionary dimension and its cardinality is lower than
-  // property of CarbonCommonConstants.HIGH_CARDINALITY_VALUE
-  // It decides whether it will do RLE encoding on data page for this dimension
-  private boolean[] rleEncodingForDictDimension;
-  private boolean[] isNoDictionary;
   private long processedDataCount;
   private ExecutorService producerExecutorService;
   private List<Future<Void>> producerExecutorServiceTaskList;
@@ -130,12 +116,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private boolean processingComplete;
 
   /**
-   * boolean to check whether dimension
-   * is of dictionary type or no dictionary type
-   */
-  private boolean[] isDictDimension;
-
-  /**
    * current data format version
    */
   private ColumnarFormatVersion version;
@@ -146,47 +126,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel model) {
     this.model = model;
     initParameters(model);
-
-    int numDimColumns = colGrpModel.getNoOfColumnStore() + model.getNoDictionaryCount()
-        + getExpandedComplexColsCount();
-    this.rleEncodingForDictDimension = new boolean[numDimColumns];
-    this.isNoDictionary = new boolean[numDimColumns];
-
-    int noDictStartIndex = this.colGrpModel.getNoOfColumnStore();
-    // setting true value for dims of high card
-    for (int i = 0; i < model.getNoDictionaryCount(); i++) {
-      this.isNoDictionary[noDictStartIndex + i] = true;
-    }
-
-    boolean isAggKeyBlock = Boolean.parseBoolean(
-        CarbonProperties.getInstance().getProperty(
-            CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
-            CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
-    if (isAggKeyBlock) {
-      int[] dimLens = model.getSegmentProperties().getDimColumnsCardinality();
-      for (int i = 0; i < model.getTableSpec().getNumSimpleDimensions(); i++) {
-        if (model.getSegmentProperties().getDimensions().get(i).isGlobalDictionaryEncoding()) {
-          this.rleEncodingForDictDimension[i] = true;
-        }
-      }
-
-      if (model.getDimensionCount() < dimLens.length) {
-        int allColsCount = getColsCount(model.getDimensionCount());
-        List<Boolean> rleWithComplex = new ArrayList<Boolean>(allColsCount);
-        for (int i = 0; i < model.getDimensionCount(); i++) {
-          GenericDataType complexDataType = model.getComplexIndexMap().get(i);
-          if (complexDataType != null) {
-            complexDataType.fillAggKeyBlock(rleWithComplex, this.rleEncodingForDictDimension);
-          } else {
-            rleWithComplex.add(this.rleEncodingForDictDimension[i]);
-          }
-        }
-        this.rleEncodingForDictDimension = new boolean[allColsCount];
-        for (int i = 0; i < allColsCount; i++) {
-          this.rleEncodingForDictDimension[i] = rleWithComplex.get(i);
-        }
-      }
-    }
     this.version = CarbonProperties.getInstance().getFormatVersion();
     StringBuffer noInvertedIdxCol = new StringBuffer();
     for (CarbonDimension cd : model.getSegmentProperties().getDimensions()) {
@@ -202,13 +141,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     SortScopeOptions.SortScope sortScope = model.getSortScope();
     this.colGrpModel = model.getSegmentProperties().getColumnGroupModel();
 
-    //TODO need to pass carbon table identifier to metadata
-    CarbonTable carbonTable =
-        CarbonMetadata.getInstance().getCarbonTable(
-            model.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + model.getTableName());
-    isDictDimension =
-        CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(model.getTableName()));
-
     // in compaction flow the measure with decimal type will come as spark decimal.
     // need to convert it to byte array.
     if (model.isCompactionFlow()) {
@@ -247,19 +179,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     consumerExecutorServiceTaskList.add(consumerExecutorService.submit(consumer));
   }
 
-  private boolean[] arrangeUniqueBlockType(boolean[] aggKeyBlock) {
-    int counter = 0;
-    boolean[] uniqueBlock = new boolean[aggKeyBlock.length];
-    for (int i = 0; i < isDictDimension.length; i++) {
-      if (isDictDimension[i]) {
-        uniqueBlock[i] = aggKeyBlock[counter++];
-      } else {
-        uniqueBlock[i] = false;
-      }
-    }
-    return uniqueBlock;
-  }
-
   private void setComplexMapSurrogateIndex(int dimensionCount) {
     int surrIndex = 0;
     for (int i = 0; i < dimensionCount; i++) {
@@ -283,9 +202,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * @throws CarbonDataWriterException
    */
   public void initialise() throws CarbonDataWriterException {
-    fileManager = new FileManager();
-    // todo: the fileManager seems to be useless, remove it later
-    fileManager.setName(new File(model.getStoreLocation()[0]).getName());
     setWritingConfiguration();
   }
 
@@ -412,27 +328,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
   }
 
-  private int getColsCount(int columnSplit) {
-    int count = 0;
-    for (int i = 0; i < columnSplit; i++) {
-      GenericDataType complexDataType = model.getComplexIndexMap().get(i);
-      if (complexDataType != null) {
-        count += complexDataType.getColsCount();
-      } else count++;
-    }
-    return count;
-  }
-
   // return the number of complex column after complex columns are expanded
   private int getExpandedComplexColsCount() {
     return model.getExpandedComplexColsCount();
   }
 
-  // return the number of complex column
-  private int getComplexColumnCount() {
-    return model.getComplexIndexMap().size();
-  }
-
   /**
    * below method will be used to close the handler
    */
@@ -519,7 +419,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.dataWriter = getFactDataWriter();
     // initialize the channel;
     this.dataWriter.initializeWriter();
-    //initializeColGrpMinMax();
   }
 
   /**
@@ -571,14 +470,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     carbonDataWriterVo.setStoreLocation(model.getStoreLocation());
     carbonDataWriterVo.setMeasureCount(model.getMeasureCount());
     carbonDataWriterVo.setTableName(model.getTableName());
-    carbonDataWriterVo.setFileManager(fileManager);
-    carbonDataWriterVo.setRleEncodingForDictDim(rleEncodingForDictDimension);
-    carbonDataWriterVo.setIsComplexType(isComplexTypes());
     carbonDataWriterVo.setNoDictionaryCount(model.getNoDictionaryCount());
     carbonDataWriterVo.setCarbonDataFileAttributes(model.getCarbonDataFileAttributes());
     carbonDataWriterVo.setDatabaseName(model.getDatabaseName());
     carbonDataWriterVo.setWrapperColumnSchemaList(model.getWrapperColumnSchema());
-    carbonDataWriterVo.setIsDictionaryColumn(isDictDimension);
     carbonDataWriterVo.setCarbonDataDirectoryPath(model.getCarbonDataDirectoryPath());
     carbonDataWriterVo.setColCardinality(model.getColCardinality());
     carbonDataWriterVo.setSegmentProperties(model.getSegmentProperties());
@@ -590,31 +485,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     return carbonDataWriterVo;
   }
 
-  private boolean[] isComplexTypes() {
-    int noDictionaryCount = model.getNoDictionaryCount();
-    int noOfColumn = colGrpModel.getNoOfColumnStore() + noDictionaryCount + getComplexColumnCount();
-    int allColsCount = getColsCount(noOfColumn);
-    boolean[] isComplexType = new boolean[allColsCount];
-
-    List<Boolean> complexTypesList = new ArrayList<Boolean>(allColsCount);
-    for (int i = 0; i < noOfColumn; i++) {
-      GenericDataType complexDataType = model.getComplexIndexMap().get(i - noDictionaryCount);
-      if (complexDataType != null) {
-        int count = complexDataType.getColsCount();
-        for (int j = 0; j < count; j++) {
-          complexTypesList.add(true);
-        }
-      } else {
-        complexTypesList.add(false);
-      }
-    }
-    for (int i = 0; i < allColsCount; i++) {
-      isComplexType[i] = complexTypesList.get(i);
-    }
-
-    return isComplexType;
-  }
-
   /**
    * This method will reset the block processing count
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
deleted file mode 100644
index ddd9bf2..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.file;
-
-
-public class FileData extends FileManager {
-
-  /**
-   * Store Path
-   */
-  private String storePath;
-
-  /**
-   * hierarchyValueWriter
-   */
-
-  public FileData(String fileName, String storePath) {
-    this.fileName = fileName;
-    this.storePath = storePath;
-  }
-
-  /**
-   * @return Returns the carbonDataFileTempPath.
-   */
-  public String getFileName() {
-    return fileName;
-  }
-
-  /**
-   * @return Returns the storePath.
-   */
-  public String getStorePath() {
-    return storePath;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
deleted file mode 100644
index cfa3a66..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.file;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-public class FileManager implements IFileManagerComposite {
-  /**
-   * listOfFileData, composite parent which holds the different objects
-   */
-  protected List<IFileManagerComposite> listOfFileData =
-      new ArrayList<IFileManagerComposite>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
-  protected String fileName;
-
-  @Override public void add(IFileManagerComposite customData) {
-    listOfFileData.add(customData);
-  }
-
-  @Override public void remove(IFileManagerComposite customData) {
-    listOfFileData.remove(customData);
-
-  }
-
-  @Override public IFileManagerComposite get(int i) {
-    return listOfFileData.get(i);
-  }
-
-  @Override public void setName(String name) {
-    this.fileName = name;
-  }
-
-  /**
-   * Return the size
-   */
-  public int size() {
-    return listOfFileData.size();
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
deleted file mode 100644
index 6691772..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.file;
-
-public interface IFileManagerComposite {
-  /**
-   * Add the data which can be either row Folder(Composite) or File
-   *
-   * @param customData
-   */
-  void add(IFileManagerComposite customData);
-
-  /**
-   * Remove the CustomData type object from the IFileManagerComposite object hierarchy.
-   *
-   * @param customData
-   */
-  void remove(IFileManagerComposite customData);
-
-  /**
-   * get the CustomData type object name
-   *
-   * @return CustomDataIntf type
-   */
-  IFileManagerComposite get(int i);
-
-  /**
-   * set the CustomData type object name
-   *
-   * @param name
-   */
-  void setName(String name);
-
-  /**
-   * Get the size
-   *
-   * @return
-   */
-  int size();
-
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 1b6ba72..855ec03 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -60,7 +60,6 @@ import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.IndexHeader;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
-import org.apache.carbondata.processing.store.file.FileData;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.io.IOUtils;
@@ -317,9 +316,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
         .getCarbonDataFileName(fileCount, dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
             dataWriterVo.getBucketNumber(), dataWriterVo.getTaskExtension(),
             "" + dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
-    String actualFileNameVal = carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
-    FileData fileData = new FileData(actualFileNameVal, chosenTempLocation);
-    dataWriterVo.getFileManager().add(fileData);
     this.carbonDataFileTempPath = chosenTempLocation + File.separator
         + carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
     this.fileCount++;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 26fff09..79cdd95 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -22,7 +22,6 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 
 /**
  * Value object for writing the data
@@ -35,12 +34,6 @@ public class CarbonDataWriterVo {
 
   private String tableName;
 
-  private IFileManagerComposite fileManager;
-
-  private boolean[] rleEncodingForDictDim;
-
-  private boolean[] isComplexType;
-
   private int NoDictionaryCount;
 
   private CarbonDataFileAttributes carbonDataFileAttributes;
@@ -49,8 +42,6 @@ public class CarbonDataWriterVo {
 
   private List<ColumnSchema> wrapperColumnSchemaList;
 
-  private boolean[] isDictionaryColumn;
-
   private String carbonDataDirectoryPath;
 
   private int[] colCardinality;
@@ -110,48 +101,6 @@ public class CarbonDataWriterVo {
   }
 
   /**
-   * @return the fileManager
-   */
-  public IFileManagerComposite getFileManager() {
-    return fileManager;
-  }
-
-  /**
-   * @param fileManager the fileManager to set
-   */
-  public void setFileManager(IFileManagerComposite fileManager) {
-    this.fileManager = fileManager;
-  }
-
-  /**
-   * @return the rleEncodingForDictDim
-   */
-  public boolean[] getRleEncodingForDictDim() {
-    return rleEncodingForDictDim;
-  }
-
-  /**
-   * @param rleEncodingForDictDim the rleEncodingForDictDim to set
-   */
-  public void setRleEncodingForDictDim(boolean[] rleEncodingForDictDim) {
-    this.rleEncodingForDictDim = rleEncodingForDictDim;
-  }
-
-  /**
-   * @return the isComplexType
-   */
-  public boolean[] getIsComplexType() {
-    return isComplexType;
-  }
-
-  /**
-   * @param isComplexType the isComplexType to set
-   */
-  public void setIsComplexType(boolean[] isComplexType) {
-    this.isComplexType = isComplexType;
-  }
-
-  /**
    * @return the noDictionaryCount
    */
   public int getNoDictionaryCount() {
@@ -208,20 +157,6 @@ public class CarbonDataWriterVo {
   }
 
   /**
-   * @return the isDictionaryColumn
-   */
-  public boolean[] getIsDictionaryColumn() {
-    return isDictionaryColumn;
-  }
-
-  /**
-   * @param isDictionaryColumn the isDictionaryColumn to set
-   */
-  public void setIsDictionaryColumn(boolean[] isDictionaryColumn) {
-    this.isDictionaryColumn = isDictionaryColumn;
-  }
-
-  /**
    * @return the carbonDataDirectoryPath
    */
   public String getCarbonDataDirectoryPath() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index ca40830..7218a12 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -383,7 +383,7 @@ public final class CarbonDataProcessorUtil {
    */
   public static Set<String> getSchemaColumnNames(CarbonDataLoadSchema schema, String tableName) {
     Set<String> columnNames = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    String factTableName = schema.getCarbonTable().getFactTableName();
+    String factTableName = schema.getCarbonTable().getTableName();
     if (tableName.equals(factTableName)) {
       List<CarbonDimension> dimensions =
           schema.getCarbonTable().getDimensionByTableName(factTableName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 29a979d..db3442e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -263,6 +263,11 @@ public final class CarbonLoaderUtil {
     AbsoluteTableIdentifier absoluteTableIdentifier =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
     CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
+    String metadataPath = carbonTablePath.getMetadataDirectoryPath();
+    FileType fileType = FileFactory.getFileType(metadataPath);
+    if (!FileFactory.isFileExist(metadataPath, fileType)) {
+      FileFactory.mkdirs(metadataPath, fileType);
+    }
     String tableStatusPath = carbonTablePath.getTableStatusFilePath();
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 58cc019..e09e3db 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -297,9 +297,9 @@ public class StoreCreator {
     String header = reader.readLine();
     String[] split = header.split(",");
     List<CarbonColumn> allCols = new ArrayList<CarbonColumn>();
-    List<CarbonDimension> dims = table.getDimensionByTableName(table.getFactTableName());
+    List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName());
     allCols.addAll(dims);
-    List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getFactTableName());
+    List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getTableName());
     allCols.addAll(msrs);
     Set<String>[] set = new HashSet[dims.size()];
     for (int i = 0; i < set.length; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 943858d..7682437 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -66,7 +66,7 @@ public class StreamSegment {
     try {
       if (carbonLock.lockWithRetries()) {
         LOGGER.info(
-            "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+            "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
                 + " for stream table get or create segment");
 
         LoadMetadataDetails[] details =
@@ -104,17 +104,17 @@ public class StreamSegment {
       } else {
         LOGGER.error(
             "Not able to acquire the lock for stream table get or create segment for table " + table
-                .getDatabaseName() + "." + table.getFactTableName());
+                .getDatabaseName() + "." + table.getTableName());
         throw new IOException("Failed to get stream segment");
       }
     } finally {
       if (carbonLock.unlock()) {
         LOGGER.info("Table unlocked successfully after stream table get or create segment" + table
-            .getDatabaseName() + "." + table.getFactTableName());
+            .getDatabaseName() + "." + table.getTableName());
       } else {
         LOGGER.error(
             "Unable to unlock table lock for stream table" + table.getDatabaseName() + "." + table
-                .getFactTableName() + " during stream table get or create segment");
+                .getTableName() + " during stream table get or create segment");
       }
     }
   }
@@ -132,7 +132,7 @@ public class StreamSegment {
     try {
       if (carbonLock.lockWithRetries()) {
         LOGGER.info(
-            "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+            "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
                 + " for stream table finish segment");
 
         LoadMetadataDetails[] details =
@@ -165,17 +165,17 @@ public class StreamSegment {
       } else {
         LOGGER.error(
             "Not able to acquire the lock for stream table status updation for table " + table
-                .getDatabaseName() + "." + table.getFactTableName());
+                .getDatabaseName() + "." + table.getTableName());
         throw new IOException("Failed to get stream segment");
       }
     } finally {
       if (carbonLock.unlock()) {
         LOGGER.info(
             "Table unlocked successfully after table status updation" + table.getDatabaseName()
-                + "." + table.getFactTableName());
+                + "." + table.getTableName());
       } else {
         LOGGER.error("Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
-            .getFactTableName() + " during table status updation");
+            .getTableName() + " during table status updation");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 31ed1f6..2c4d35f 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -147,7 +147,7 @@ object StreamSinkFactory {
     val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, parameters)
     optionsFinal.put("sort_scope", "no_sort")
     if (parameters.get("fileheader").isEmpty) {
-      optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getFactTableName)
+      optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName)
         .asScala.map(_.getColName).mkString(","))
     }
     val carbonLoadModel = new CarbonLoadModel()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
index 6ee3296..c2789f4 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
@@ -42,14 +42,14 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi
         LockUsage.STREAMING_LOCK)
       if (lock.lockWithRetries()) {
         LOGGER.info("Acquired the lock for stream table: " + carbonTable.getDatabaseName + "." +
-                    carbonTable.getFactTableName)
+                    carbonTable.getTableName)
         cache.put(event.id, lock)
       } else {
         LOGGER.error("Not able to acquire the lock for stream table:" +
-                     carbonTable.getDatabaseName + "." + carbonTable.getFactTableName)
+                     carbonTable.getDatabaseName + "." + carbonTable.getTableName)
         throw new InterruptedException(
           "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." +
-          carbonTable.getFactTableName)
+          carbonTable.getTableName)
       }
     }
   }