You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/29 01:41:22 UTC

carbondata git commit: [CARBONDATA-1820] Extract CarbonTable.buildUniqueName method and re-factory code to invoke this method

Repository: carbondata
Updated Branches:
  refs/heads/master 445615fea -> 7e124f4f1


[CARBONDATA-1820] Extract CarbonTable.buildUniqueName method and re-factory code to invoke this method

Extract CarbonTable.buildUniqueName method and re-factory code to invoke this method

This closes #1579


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7e124f4f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7e124f4f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7e124f4f

Branch: refs/heads/master
Commit: 7e124f4f1dfa8fec3db2271e0f3d6b4590b8e02c
Parents: 445615f
Author: QiangCai <qi...@qq.com>
Authored: Tue Nov 28 14:42:51 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Nov 29 09:41:11 2017 +0800

----------------------------------------------------------------------
 .../core/metadata/CarbonMetadata.java           | 21 ++++++++++
 .../ThriftWrapperSchemaConverterImpl.java       |  3 +-
 .../core/metadata/schema/table/CarbonTable.java | 11 +++++
 .../core/metadata/CarbonMetadataTest.java       |  2 +-
 .../metadata/schema/table/TableInfoTest.java    | 12 +++---
 .../hadoop/test/util/StoreCreator.java          |  4 +-
 .../presto/util/CarbonDataStoreCreator.scala    |  5 +--
 .../sdv/generated/MergeIndexTestCase.scala      |  2 +-
 .../dataload/TestLoadDataGeneral.scala          |  3 +-
 .../InsertIntoCarbonTableTestCase.scala         |  5 ++-
 ...ompactionSupportGlobalSortFunctionTest.scala |  3 +-
 ...mpactionSupportGlobalSortParameterTest.scala |  4 +-
 .../MajorCompactionIgnoreInMinorTest.scala      | 26 ++++++++----
 .../MajorCompactionStopsAfterCompaction.scala   | 14 +++++--
 .../dataload/TestBatchSortDataLoad.scala        |  7 +++-
 .../dataload/TestDataLoadWithFileName.scala     |  2 +-
 .../dataload/TestGlobalSortDataLoad.scala       |  4 +-
 .../testsuite/datamap/TestDataMapCommand.scala  | 12 +++---
 .../dataretention/DataRetentionTestCase.scala   | 12 +++++-
 .../partition/TestDDLForPartitionTable.scala    |  6 +--
 ...ForPartitionTableWithDefaultProperties.scala |  8 ++--
 .../command/carbonTableSchemaCommon.scala       |  2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  5 +--
 .../AlterTableDropCarbonPartitionCommand.scala  |  3 +-
 .../AlterTableSplitCarbonPartitionCommand.scala |  3 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |  6 +--
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |  2 +-
 .../partition/TestAlterPartitionTable.scala     | 44 +++++++++++++-------
 .../bucketing/TableBucketingTestCase.scala      |  4 +-
 .../restructure/AlterTableRevertTestCase.scala  |  2 +-
 .../apache/spark/util/CarbonCommandSuite.scala  |  2 +-
 .../merger/RowResultMergerProcessor.java        |  4 +-
 .../partition/impl/QueryPartitionHelper.java    |  8 ++--
 .../store/CarbonFactDataHandlerModel.java       |  5 +--
 .../util/CarbonDataProcessorUtil.java           |  9 ++--
 .../carbondata/processing/StoreCreator.java     |  4 +-
 36 files changed, 171 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
index 2face7c..e2ce43a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -58,6 +58,16 @@ public final class CarbonMetadata {
   }
 
   /**
+   * removed the table information
+   *
+   * @param databaseName
+   * @param tableName
+   */
+  public void removeTable(String databaseName, String tableName) {
+    removeTable(CarbonTable.buildUniqueName(databaseName, tableName));
+  }
+
+  /**
    * Below method will be used to set the carbon table
    * This method will be used in executor side as driver will always have
    * updated table so from driver during query execution and data loading
@@ -94,6 +104,17 @@ public final class CarbonMetadata {
   }
 
   /**
+   * Below method to get the loaded carbon table
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  public CarbonTable getCarbonTable(String databaseName, String tableName) {
+    return getCarbonTable(CarbonTable.buildUniqueName(databaseName, tableName));
+  }
+
+  /**
    * @return the number of tables present in the schema
    */
   public int getNumberOfTables() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 0d60584..408c861 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaFactory;
 import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
@@ -616,7 +617,7 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         schemaEvolutionList.get(schemaEvolutionList.size() - 1)
             .getTime_stamp());
     wrapperTableInfo.setDatabaseName(dbName);
-    wrapperTableInfo.setTableUniqueName(dbName + "_" + tableName);
+    wrapperTableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName));
     wrapperTableInfo.setTablePath(tablePath);
     wrapperTableInfo.setFactTable(
         fromExternalToWrapperTableSchema(externalTableInfo.getFact_table(), tableName));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 97a9445..66b747b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -361,6 +361,17 @@ public class CarbonTable implements Serializable {
   }
 
   /**
+   * build table unique name
+   * all should call this method to build table unique name
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  public static String buildUniqueName(String databaseName, String tableName) {
+    return databaseName + CarbonCommonConstants.UNDERSCORE + tableName;
+  }
+
+  /**
    * @return the metaDataFilepath
    */
   public String getMetaDataFilepath() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
index 5361fb0..fcef8f4 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
@@ -48,7 +48,7 @@ public class CarbonMetadataTest {
   @BeforeClass public static void setUp() {
     carbonMetadata = CarbonMetadata.getInstance();
     carbonMetadata.loadTableMetadata(getTableInfo(10000));
-    tableUniqueName = "carbonTestDatabase_carbonTestTable";
+    tableUniqueName = CarbonTable.buildUniqueName("carbonTestDatabase", "carbonTestTable");
   }
 
   @AfterClass public static void tearDown() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java
index 073328e..811ce00 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java
@@ -26,7 +26,7 @@ public class TableInfoTest extends TestCase {
     private TableInfo tableInfo;
 
     @BeforeClass public void setUp() {
-        tableInfo = getTableInfo("tableInfoTestDatabase_equalsTableInfoTestTable");
+        tableInfo = getTableInfo("tableInfoTestDatabase", "equalsTableInfoTestTable");
     }
 
     @AfterClass public void tearDown() {
@@ -34,17 +34,19 @@ public class TableInfoTest extends TestCase {
     }
 
     @Test public void testTableInfoEquals() {
-        TableInfo cmpEqualsTableInfo = getTableInfo("tableInfoTestDatabase_equalsTableInfoTestTable");
-        TableInfo cmpNotEqualsTableInfo = getTableInfo("tableInfoTestDatabase_notEqualsTableInfoTestTable");
+        TableInfo cmpEqualsTableInfo =
+            getTableInfo("tableInfoTestDatabase", "equalsTableInfoTestTable");
+        TableInfo cmpNotEqualsTableInfo =
+            getTableInfo("tableInfoTestDatabase", "notEqualsTableInfoTestTable");
         assertTrue(tableInfo.equals(cmpEqualsTableInfo));
         assertTrue(!(tableInfo.equals(cmpNotEqualsTableInfo)));
     }
 
-    private TableInfo getTableInfo(String tableUniqueName) {
+    private TableInfo getTableInfo(String databaseName, String tableName) {
         TableInfo info = new TableInfo();
         info.setDatabaseName("tableInfoTestDatabase");
         info.setLastUpdatedTime(1000L);
-        info.setTableUniqueName(tableUniqueName);
+        info.setTableUniqueName(CarbonTable.buildUniqueName(databaseName, tableName));
         return info;
     }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index c45f910..ab8790d 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -272,8 +272,8 @@ public class StoreCreator {
     tableSchema.setSchemaEvalution(schemaEvol);
     tableSchema.setTableId(UUID.randomUUID().toString());
     tableInfo.setTableUniqueName(
-        absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName() + "_"
-            + absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+        absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName()
+    );
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setFactTable(tableSchema);
     CarbonTablePath carbonTablePath = CarbonStorePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 1430baf..87f5fa0 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -287,9 +287,8 @@ object CarbonDataStoreCreator {
     tableSchema.setSchemaEvalution(schemaEvol)
     tableSchema.setTableId(UUID.randomUUID().toString)
     tableInfo.setTableUniqueName(
-      absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName +
-      "_" +
-      absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+      absoluteTableIdentifier.getCarbonTableIdentifier.getTableUniqueName
+    )
     tableInfo.setLastUpdatedTime(System.currentTimeMillis())
     tableInfo.setFactTable(tableSchema)
     val carbonTablePath: CarbonTablePath = CarbonStorePath.getCarbonTablePath(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index 758c897..cb0d02c 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -107,7 +107,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   private def getIndexFileCount(dbName: String, tableName: String, segment: String): Int = {
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName, tableName)
     val identifier = carbonTable.getAbsoluteTableIdentifier
     val path = CarbonTablePath
       .getSegmentPath(identifier.getTablePath, segment)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index f51fb18..e3d497a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.spark.sql.test.util.QueryTest
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 
 class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
@@ -48,7 +49,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
       segmentId: String,
       datbaseName: String,
       tableName: String): Boolean = {
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(datbaseName + "_" + tableName)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(datbaseName, tableName)
     val partitionPath = CarbonStorePath
       .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
     val fileType: FileFactory.FileType = FileFactory.getFileType(partitionPath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index ea1bbfd..09f4451 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
 import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonStorePath
 
 class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
@@ -227,7 +228,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("insert overwrite table CarbonOverwrite select * from THive")
     sql("insert overwrite table HiveOverwrite select * from THive")
     checkAnswer(sql("select count(*) from CarbonOverwrite"), sql("select count(*) from HiveOverwrite"))
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default" + "_" +"carbonoverwrite")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbonoverwrite")
     val partitionPath = CarbonStorePath
       .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
     val folder = new File(partitionPath)
@@ -250,7 +251,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
     sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' overwrite INTO TABLE HiveOverwrite")
     checkAnswer(sql("select count(*) from TCarbonSourceOverwrite"), sql("select count(*) from HiveOverwrite"))
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default" + "_" +"tcarbonsourceoverwrite")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "tcarbonsourceoverwrite")
     val partitionPath = CarbonStorePath
       .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
     val folder = new File(partitionPath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
index 65a50fe..4958f55 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -26,6 +26,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 
 class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -528,7 +529,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf
   }
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default" + "_" + tableName)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
     val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" +
                 segmentNo
     new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
index 1de5c73..f9959fa 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
@@ -27,8 +27,8 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.util.path.CarbonTablePath
-
 import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonStorePath
 
 class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -531,7 +531,7 @@ class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndA
   }
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default" + "_" + tableName)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
     val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" +
                 segmentNo
     new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index 9bf916e..ed63fdf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -30,6 +30,8 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.hadoop.CacheClient
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
 /**
   * FT for compaction scenario where major segment should not be included in minor.
   */
@@ -77,8 +79,10 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     // delete merged segments
     sql("clean files for table ignoremajor")
 
-    val carbonTable = CarbonMetadata.getInstance()
-      .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "ignoremajor")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "ignoremajor"
+    )
     val absoluteTableIdentifier = carbonTable
       .getAbsoluteTableIdentifier
     val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
@@ -110,8 +114,10 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
       case _:Throwable => assert(true)
     }
 
-    val carbonTable = CarbonMetadata.getInstance()
-      .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "ignoremajor")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "ignoremajor"
+    )
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
 
     val carbontablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
@@ -130,8 +136,10 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
       "delete from table ignoremajor where segment.starttime before " +
         " '2222-01-01 19:35:01'"
     )
-    val carbonTable = CarbonMetadata.getInstance()
-      .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "ignoremajor")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "ignoremajor"
+    )
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val carbontablePath = CarbonStorePath
       .getCarbonTablePath(absoluteTableIdentifier).getMetadataDirectoryPath
@@ -170,8 +178,10 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     )
     sql("alter table testmajor compact 'major'")
 
-    val carbonTable = CarbonMetadata.getInstance()
-      .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "testmajor")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "testmajor"
+    )
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
       absoluteTableIdentifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index 02560d9..7363838 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -26,6 +26,8 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
 /**
   * FT for compaction scenario where major compaction will only compact the segments which are
   * present at the time of triggering the compaction.
@@ -78,8 +80,10 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
     var status = false
     var noOfRetries = 0
     while (!status && noOfRetries < 10) {
-      val carbonTable = CarbonMetadata.getInstance()
-        .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "stopmajor")
+      val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+        CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+        "stopmajor"
+      )
       val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
 
       val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
@@ -110,8 +114,10 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
     // delete merged segments
     sql("clean files for table stopmajor")
 
-    val carbonTable = CarbonMetadata.getInstance()
-      .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "stopmajor")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "stopmajor"
+    )
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
 
     val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index ae25894..4af9d54 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.test.util.QueryTest
 
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonStorePath
 
 class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
@@ -188,8 +189,10 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
   }
 
   def getIndexfileCount(tableName: String, segmentNo: String = "0"): Int = {
-    val carbonTable = CarbonMetadata.getInstance()
-      .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + tableName)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      tableName
+    )
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index 44bb2dd..dae0962 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -47,7 +47,7 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
     val testData = s"$resourcesPath/sample.csv"
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table test_table_v3")
     val indexReader = new CarbonIndexFileReader()
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_test_table_v3")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "test_table_v3")
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", "0")
     val carbonIndexPaths = new File(segmentDir)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 2031592..6bbc763 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -26,8 +26,8 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-
 import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonStorePath
 
 class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -329,7 +329,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
   }
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default"+"_"+tableName)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo)
     new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 5db0a0f..d61971e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -33,7 +33,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
 
   test("test datamap create") {
     sql("create datamap datamap1 on table datamaptest using 'new.class'")
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamaptest")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
     assert(table != null)
     val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
     assert(dataMapSchemaList.size() == 1)
@@ -43,7 +43,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
 
   test("test datamap create with dmproperties") {
     sql("create datamap datamap2 on table datamaptest using 'new.class' dmproperties('key'='value')")
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamaptest")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
     assert(table != null)
     val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
     assert(dataMapSchemaList.size() == 2)
@@ -57,7 +57,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
       sql(
         "create datamap datamap2 on table datamaptest using 'new.class' dmproperties('key'='value')")
     }
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamaptest")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
     assert(table != null)
     val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
     assert(dataMapSchemaList.size() == 2)
@@ -67,7 +67,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     sql("drop datamap if exists datamap3 on table datamaptest")
     sql(
       "create datamap datamap3 on table datamaptest using 'preaggregate' dmproperties('key'='value') as select count(a) from datamaptest")
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamaptest")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
     assert(table != null)
     val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
     assert(dataMapSchemaList.size() == 3)
@@ -82,7 +82,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
         "create datamap datamap2 on table datamaptest using 'preaggregate' dmproperties('key'='value') as select count(a) from datamaptest")
 
     }
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamaptest")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
     assert(table != null)
     val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
     assert(dataMapSchemaList.size() == 3)
@@ -93,7 +93,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
       sql("drop table datamap3")
 
     }
-    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamaptest")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
     assert(table != null)
     val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
     assert(dataMapSchemaList.size() == 3)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index 2c6178d..99a729c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -32,6 +32,8 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
 /**
  * This class contains data retention feature test cases
  */
@@ -67,9 +69,15 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
       "phonetype String, serialname String, salary int) stored by 'org.apache.carbondata.format'"
 
     )
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME+"_"+ "retentionlock")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "retentionlock"
+    )
     absoluteTableIdentifierForLock = carbonTable.getAbsoluteTableIdentifier
-    val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME+"_"+ "dataRetentionTable")
+    val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "dataRetentionTable"
+    )
     absoluteTableIdentifierForRetention = carbonTable2.getAbsoluteTableIdentifier
     carbonTablePath = CarbonStorePath
       .getCarbonTablePath(absoluteTableIdentifierForRetention).getMetadataDirectoryPath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
index df1bd2e..2a9a84b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
@@ -50,7 +50,7 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
         | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
       """.stripMargin)
 
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_hashTable")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "hashTable")
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("empno"))
@@ -73,7 +73,7 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
         |  'RANGE_INFO'='2017-06-11 00:00:02, 2017-06-13 23:59:59', 'DICTIONARY_INCLUDE'='doj')
       """.stripMargin)
 
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_rangeTable")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "rangeTable")
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj"))
@@ -100,7 +100,7 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
         | TBLPROPERTIES('PARTITION_TYPE'='LIST',
         |  'LIST_INFO'='0, 1, (2, 3)')
       """.stripMargin)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_listTable")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "listTable")
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("workgroupcategory"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
index c17ca00..1f9c61d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
@@ -44,7 +44,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
         | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
       """.stripMargin)
 
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_hashTable")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "hashTable")
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("empno"))
@@ -67,7 +67,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
         |  'RANGE_INFO'='2017-06-11 00:00:02, 2017-06-13 23:59:59','DICTIONARY_INCLUDE'='doj')
       """.stripMargin)
 
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_rangeTable")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "rangeTable")
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj"))
@@ -95,7 +95,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
         |  'LIST_INFO'='2017-06-11 00:00:02, 2017-06-13 23:59:59',
         |  'DICTIONARY_INCLUDE'='projectenddate')
       """.stripMargin)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_listTable")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "listTable")
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate"))
@@ -127,7 +127,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
         | TBLPROPERTIES('PARTITION_TYPE'='LIST',
         |  'LIST_INFO'='2017-06-11 , 2017-06-13')
       """.stripMargin)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_listTableDate")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "listTableDate")
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index d64544d..844f6f7 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -625,7 +625,7 @@ class TableNewProcessor(cm: TableModel) {
     tableSchema.setListOfColumns(allColumns.asJava)
     tableSchema.setSchemaEvalution(schemaEvol)
     tableInfo.setDatabaseName(cm.databaseName)
-    tableInfo.setTableUniqueName(cm.databaseName + "_" + cm.tableName)
+    tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(cm.databaseName, cm.tableName))
     tableInfo.setLastUpdatedTime(System.currentTimeMillis())
     tableInfo.setFactTable(tableSchema)
     tableInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f384d28..351d765 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -665,9 +665,8 @@ object CarbonDataRDDFactory {
   private def writeDictionary(carbonLoadModel: CarbonLoadModel,
       result: Option[DictionaryServer], writeAll: Boolean): Unit = {
     // write dictionary file
-    val uniqueTableName: String = s"${ carbonLoadModel.getDatabaseName }_${
-      carbonLoadModel.getTableName
-    }"
+    val uniqueTableName: String =
+      CarbonTable.buildUniqueName(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
     result match {
       case Some(server) =>
         try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
index 5a0e4cc..db87fc8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -70,7 +71,7 @@ case class AlterTableDropCarbonPartitionCommand(
     if (relation == null) {
       sys.error(s"Table $dbName.$tableName does not exist")
     }
-    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
+    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
       LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
       sys.error(s"Alter table failed. table not found: $dbName.$tableName")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
index 841da67..21871f3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetad
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -74,7 +75,7 @@ case class AlterTableSplitCarbonPartitionCommand(
       sys.error(s"Table $dbName.$tableName does not exist")
     }
     carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
-    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
+    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
       LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
       sys.error(s"Alter table failed. table not found: $dbName.$tableName")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 64f2a51..e99a1a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -211,7 +211,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val tableMetadataFile = carbonTablePath.getSchemaFilePath
     val fileType = FileFactory.getFileType(tableMetadataFile)
     if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-      val tableUniqueName = dbName + "_" + tableName
+      val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName)
       val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl
       val wrapperTableInfo = schemaConverter
@@ -402,7 +402,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     carbonTableToBeRemoved match {
       case Some(carbonTable) =>
         metadata.carbonTables -= carbonTable
-        CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
+        CarbonMetadata.getInstance.removeTable(dbName, tableName)
       case None =>
         if (LOGGER.isDebugEnabled) {
           LOGGER.debug(s"No entry for table $tableName in database $dbName")
@@ -458,7 +458,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
     val metadataFilePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
       .getMetadataDirectoryPath
-    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 30d8ccc..f98a53a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -70,7 +70,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     (sparkSession: SparkSession): Unit = {
     val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
-    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 287191c..1115a21 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -244,7 +244,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
   test("Alter table add partition: List Partition") {
     sql("""ALTER TABLE list_table_area ADD PARTITION ('OutSpace', 'Hi')""".stripMargin)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "list_table_area")
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val list_info = partitionInfo.getListInfo
@@ -285,7 +285,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     intercept[Exception]  { sql("""ALTER TABLE DROP PARTITION(0) WITH DATA""")}
     
     sql("""ALTER TABLE list_table_area DROP PARTITION(2) WITH DATA""")
-    val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
+    val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default", "list_table_area")
     val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getTableName)
     val partitionIds2 = partitionInfo2.getPartitionIds
     val list_info2 = partitionInfo2.getListInfo
@@ -303,7 +303,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
   test("Alter table add partition: Range Partition") {
     sql("""ALTER TABLE range_table_logdate ADD PARTITION ('2017/01/01', '2018/01/01')""")
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "range_table_logdate")
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val range_info = partitionInfo.getRangeInfo
@@ -341,7 +341,10 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     checkAnswer(result_after5, result_origin5)
 
     sql("""ALTER TABLE range_table_logdate DROP PARTITION(3) WITH DATA;""")
-    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
+    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable(
+      "default",
+      "range_table_logdate"
+    )
     val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val range_info1 = partitionInfo1.getRangeInfo
@@ -372,7 +375,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
   test("Alter table split partition: List Partition") {
     sql("""ALTER TABLE list_table_country SPLIT PARTITION(4) INTO ('Canada', 'Russia', '(Good, NotGood)')""".stripMargin)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "list_table_country")
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val list_info = partitionInfo.getListInfo
@@ -414,7 +417,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     checkAnswer(result_after5, result_origin5)
 
     sql("""ALTER TABLE list_table_country DROP PARTITION(8)""")
-    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
+    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default", "list_table_country")
     val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val list_info1 = partitionInfo1.getListInfo
@@ -437,7 +440,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   test("Alter table split partition with different List Sequence: List Partition") {
     sql("""ALTER TABLE list_table_country ADD PARTITION ('(Part1, Part2, Part3, Part4)')""".stripMargin)
     sql("""ALTER TABLE list_table_country SPLIT PARTITION(9) INTO ('Part4', 'Part2', '(Part1, Part3)')""".stripMargin)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "list_table_country")
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val list_info = partitionInfo.getListInfo
@@ -484,7 +487,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   test("Alter table split partition with extra space in New SubList: List Partition") {
     sql("""ALTER TABLE list_table_area ADD PARTITION ('(One,Two, Three, Four)')""".stripMargin)
     sql("""ALTER TABLE list_table_area SPLIT PARTITION(6) INTO ('One', '(Two, Three )', 'Four')""".stripMargin)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "list_table_area")
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val list_info = partitionInfo.getListInfo
@@ -527,7 +530,10 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
   test("Alter table split partition: Range Partition") {
     sql("""ALTER TABLE range_table_logdate_split SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""")
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      "default",
+      "range_table_logdate_split"
+    )
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val rangeInfo = partitionInfo.getRangeInfo
@@ -565,7 +571,10 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     checkAnswer(result_after5, result_origin5)
 
     sql("""ALTER TABLE range_table_logdate_split DROP PARTITION(6)""")
-    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
+    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable(
+      "default",
+      "range_table_logdate_split"
+    )
     val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val rangeInfo1 = partitionInfo1.getRangeInfo
@@ -585,7 +594,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
   test("Alter table split partition: Range Partition + Bucket") {
     sql("""ALTER TABLE range_table_bucket SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""")
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "range_table_bucket")
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val rangeInfo = partitionInfo.getRangeInfo
@@ -623,7 +632,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     checkAnswer(result_after5, result_origin5)
 
     sql("""ALTER TABLE range_table_bucket DROP PARTITION(6) WITH DATA""")
-    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default", "range_table_bucket")
     val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val rangeInfo1 = partitionInfo1.getRangeInfo
@@ -641,7 +650,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     checkAnswer(result_after6, result_origin6)
 
     sql("""ALTER TABLE range_table_bucket DROP PARTITION(3)""")
-    val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+    val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default", "range_table_bucket")
     val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getTableName)
     val partitionIds2 = partitionInfo2.getPartitionIds
     val rangeInfo2 = partitionInfo2.getRangeInfo
@@ -658,7 +667,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     checkAnswer(result_origin7, result_after7)
 
     sql("""ALTER TABLE range_table_bucket DROP PARTITION(5)""")
-    val carbonTable3 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+    val carbonTable3 = CarbonMetadata.getInstance().getCarbonTable("default", "range_table_bucket")
     val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getTableName)
     val partitionIds3 = partitionInfo3.getPartitionIds
     val rangeInfo3 = partitionInfo3.getRangeInfo
@@ -788,7 +797,10 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
     sql("ALTER TABLE carbon_table_default_db ADD PARTITION ('2017')")
 
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_carbon_table_default_db")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      "default",
+      "carbon_table_default_db"
+    )
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val range_info = partitionInfo.getRangeInfo
@@ -808,7 +820,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
     sql("ALTER TABLE carbondb.carbontable ADD PARTITION ('2017')")
 
-    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("carbondb_carbontable")
+    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("carbondb", "carbontable")
     val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable1.getTableName)
     val partitionIds1 = partitionInfo1.getPartitionIds
     val range_info1 = partitionInfo1.getRangeInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index cbc7879..102df39 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -52,7 +52,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
         "serialname String, salary Int) STORED BY 'carbondata' TBLPROPERTIES " +
         "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
     sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t4")
-    val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_t4")
+    val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "t4")
     if (table != null && table.getBucketingInfo("t4") != null) {
       assert(true)
     } else {
@@ -67,7 +67,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
         "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
     sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t10")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
-    val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_t10")
+    val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "t10")
     if (table != null && table.getBucketingInfo("t10") != null) {
       assert(true)
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 00170e2..9a6efbe 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -90,7 +90,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       intercept[AnalysisException] {
         sql("select newField from reverttest")
       }
-      val carbonTable = CarbonMetadata.getInstance.getCarbonTable("default_reverttest")
+      val carbonTable = CarbonMetadata.getInstance.getCarbonTable("default", "reverttest")
 
       assert(new File(carbonTable.getMetaDataFilepath).listFiles().length < 6)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index d53ac43..1b528f9 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -117,7 +117,7 @@ class CarbonCommandSuite extends Spark2QueryTest with BeforeAndAfterAll {
     createAndLoadTestTable(table, "csv_table")
     DeleteSegmentById.main(Array(s"${location}", table, "0"))
     CleanFiles.main(Array(s"${location}", table))
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_"+table)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", table)
     val tablePath = carbonTable.getAbsoluteTableIdentifier.getTablePath
     val f = new File(s"$tablePath/Fact/Part0")
     assert(f.isDirectory)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index f82f365..fbe2c85 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -23,7 +23,6 @@ import java.util.PriorityQueue;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
@@ -62,8 +61,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
     this.segprop = segProp;
     CarbonDataProcessorUtil.createLocations(tempStoreLocation);
 
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-            .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
             tempStoreLocation);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
index 4fb2414..79e9e5a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.processing.partition.DataPartitioner;
 import org.apache.carbondata.processing.partition.Partition;
@@ -46,7 +47,8 @@ public final class QueryPartitionHelper {
    * Get partitions applicable for query based on filters applied in query
    */
   public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
-    String tableUniqueName = queryPlan.getDatabaseName() + '_' + queryPlan.getTableName();
+    String tableUniqueName =
+        CarbonTable.buildUniqueName(queryPlan.getDatabaseName(), queryPlan.getTableName());
 
     DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
 
@@ -54,7 +56,7 @@ public final class QueryPartitionHelper {
   }
 
   public List<Partition> getAllPartitions(String databaseName, String tableName) {
-    String tableUniqueName = databaseName + '_' + tableName;
+    String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName);
 
     DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
 
@@ -65,7 +67,7 @@ public final class QueryPartitionHelper {
    * Get the node name where the partition is assigned to.
    */
   public String getLocation(Partition partition, String databaseName, String tableName) {
-    String tableUniqueName = databaseName + '_' + tableName;
+    String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName);
 
     DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName);
     return loadBalancer.getNodeForPartitions(partition);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 4f9458c..37b585d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -182,8 +181,8 @@ public class CarbonFactDataHandlerModel {
       }
     }
     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-        identifier.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + identifier
-            .getTableName());
+        identifier.getDatabaseName(), identifier.getTableName());
+
     List<ColumnSchema> wrapperColumnSchema = CarbonUtil
         .getColumnSchemaList(carbonTable.getDimensionByTableName(identifier.getTableName()),
             carbonTable.getMeasureByTableName(identifier.getTableName()));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index b64362e..a18147a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -200,8 +200,7 @@ public final class CarbonDataProcessorUtil {
     String[] baseTmpStorePathArray = StringUtils.split(baseTempStorePath, File.pathSeparator);
     String[] localDataFolderLocArray = new String[baseTmpStorePathArray.length];
 
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     for (int i = 0 ; i < baseTmpStorePathArray.length; i++) {
       String tmpStore = baseTmpStorePathArray[i];
       CarbonTablePath carbonTablePath =
@@ -412,8 +411,7 @@ public final class CarbonDataProcessorUtil {
     for (int i = 0; i < type.length; i++) {
       type[i] = DataTypes.DOUBLE;
     }
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-        databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(tableName);
     for (int i = 0; i < type.length; i++) {
       type[i] = measures.get(i).getDataType();
@@ -436,8 +434,7 @@ public final class CarbonDataProcessorUtil {
    */
   public static String checkAndCreateCarbonStoreLocation(String factStoreLocation,
       String databaseName, String tableName, String partitionId, String segmentId) {
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
     CarbonTablePath carbonTablePath =
         CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e124f4f/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index e09e3db..8c2f7bb 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -258,8 +258,8 @@ public class StoreCreator {
     tableSchema.setSchemaEvalution(schemaEvol);
     tableSchema.setTableId(UUID.randomUUID().toString());
     tableInfo.setTableUniqueName(
-        absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName() + "_"
-            + absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+        absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName()
+    );
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setFactTable(tableSchema);