You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/14 16:59:43 UTC

[5/7] carbondata git commit: [CARBONDATA-1573] [Integration] Support Database Location Configuration while Creating Database/ Support Creation of carbon Table in the database location

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java b/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
index 8bb2052..86ba9c8 100644
--- a/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
@@ -33,6 +33,7 @@ import java.util.Properties;
 import java.util.UUID;
 
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
@@ -65,6 +66,8 @@ public class CarbonDictionaryWriterImplTest {
 
   private CarbonTableIdentifier carbonTableIdentifier;
 
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
   private String databaseName;
 
   private String tableName;
@@ -100,8 +103,9 @@ public class CarbonDictionaryWriterImplTest {
     this.carbonStorePath = props.getProperty("storePath", "carbonStore");
     this.columnIdentifier = new ColumnIdentifier("Name", null, null);
     carbonTableIdentifier = new CarbonTableIdentifier(databaseName, tableName, UUID.randomUUID().toString());
+    absoluteTableIdentifier = new AbsoluteTableIdentifier(carbonStorePath, carbonTableIdentifier);
     this.dictionaryColumnUniqueIdentifier =
-        new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, columnIdentifier,
+        new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, columnIdentifier,
             columnIdentifier.getDataType(),
             CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier));
     deleteStorePath();
@@ -183,8 +187,7 @@ public class CarbonDictionaryWriterImplTest {
    */
   private CarbonDictionaryWriterImpl prepareWriter() throws IOException {
     initDictionaryDirPaths();
-    return new CarbonDictionaryWriterImpl(this.carbonStorePath, carbonTableIdentifier,
-        dictionaryColumnUniqueIdentifier);
+    return new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
   }
 
   /**
@@ -438,8 +441,7 @@ public class CarbonDictionaryWriterImplTest {
    */
   private List<CarbonDictionaryColumnMetaChunk> readDictionaryMetadataFile() throws IOException {
     CarbonDictionaryMetadataReaderImpl columnMetadataReaderImpl =
-        new CarbonDictionaryMetadataReaderImpl(this.carbonStorePath, this.carbonTableIdentifier,
-            this.dictionaryColumnUniqueIdentifier);
+        new CarbonDictionaryMetadataReaderImpl(this.dictionaryColumnUniqueIdentifier);
     List<CarbonDictionaryColumnMetaChunk> dictionaryMetaChunkList = null;
     // read metadata file
     try {
@@ -457,8 +459,7 @@ public class CarbonDictionaryWriterImplTest {
   private List<byte[]> readDictionaryFile(long dictionaryStartOffset, long dictionaryEndOffset)
       throws IOException {
     CarbonDictionaryReaderImpl dictionaryReader =
-        new CarbonDictionaryReaderImpl(this.carbonStorePath, this.carbonTableIdentifier,
-            this.dictionaryColumnUniqueIdentifier);
+        new CarbonDictionaryReaderImpl(this.dictionaryColumnUniqueIdentifier);
     List<byte[]> dictionaryValues = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     try {
       if (0 == dictionaryEndOffset) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java b/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
index d04d8a2..e64726a 100644
--- a/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReader;
@@ -45,6 +46,7 @@ public class CarbonDictionarySortIndexWriterImplTest {
 
   private String storePath;
   private CarbonTableIdentifier carbonTableIdentifier = null;
+  private AbsoluteTableIdentifier absoluteTableIdentifier = null;
   private ColumnIdentifier columnIdentifier = null;
   private CarbonDictionaryWriter dictionaryWriter = null;
   private CarbonDictionarySortIndexWriter dictionarySortIndexWriter = null;
@@ -54,15 +56,21 @@ public class CarbonDictionarySortIndexWriterImplTest {
     storePath = "target/carbonStore";
     carbonTableIdentifier =
         new CarbonTableIdentifier("testSchema", "carbon", UUID.randomUUID().toString());
+    String tablePath =
+        storePath + "/" + carbonTableIdentifier.getDatabaseName() + "/" + carbonTableIdentifier
+            .getTableName();
+    absoluteTableIdentifier = new AbsoluteTableIdentifier(tablePath, carbonTableIdentifier);
     columnIdentifier = new ColumnIdentifier("Name", null, null);
-    DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, columnIdentifier, columnIdentifier.getDataType(),
-        CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier));
+    DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
+        new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, columnIdentifier,
+            columnIdentifier.getDataType(),
+            CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier));
     dictionaryWriter =
-        new CarbonDictionaryWriterImpl(storePath, carbonTableIdentifier, dictionaryColumnUniqueIdentifier);
+        new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
     dictionarySortIndexWriter =
-        new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, dictionaryColumnUniqueIdentifier, storePath);
+        new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier);
     carbonDictionarySortIndexReader =
-        new CarbonDictionarySortIndexReaderImpl(carbonTableIdentifier, dictionaryColumnUniqueIdentifier, storePath);
+        new CarbonDictionarySortIndexReaderImpl(dictionaryColumnUniqueIdentifier);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
index 4c5b359..8be1e2e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
@@ -31,9 +31,9 @@ public class CacheClient {
   private CacheAccessClient<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
       segmentAccessClient;
 
-  public CacheClient(String storePath) {
+  public CacheClient() {
     Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> segmentCache =
-        CacheProvider.getInstance().createCache(CacheType.DRIVER_BTREE, storePath);
+        CacheProvider.getInstance().createCache(CacheType.DRIVER_BTREE);
     segmentAccessClient = new CacheAccessClient<>(segmentCache);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index e5aac84..0aa2974 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -82,7 +82,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -96,7 +95,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.util.StringUtils;
 
 /**
  * Carbon Input format class representing one carbon table
@@ -116,7 +114,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
   private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
-
+  private static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
+  private static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
 
@@ -313,11 +312,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
       throws IOException {
     String dirs = configuration.get(INPUT_DIR, "");
-    String[] inputPaths = StringUtils.split(dirs);
-    if (inputPaths.length == 0) {
-      throw new InvalidPathException("No input paths specified in job");
-    }
-    return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
+    return AbsoluteTableIdentifier
+        .from(dirs, getDatabaseName(configuration), getTableName(configuration));
   }
 
   /**
@@ -331,7 +327,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    */
   @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
-    CacheClient cacheClient = new CacheClient(identifier.getStorePath());
+    CacheClient cacheClient = new CacheClient();
     try {
       List<String> invalidSegments = new ArrayList<>();
       List<UpdateVO> invalidTimestampsList = new ArrayList<>();
@@ -727,7 +723,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    */
   public BlockMappingVO getBlockRowCount(JobContext job,
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException, KeyGenException {
-    CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
+    CacheClient cacheClient = new CacheClient();
     try {
       SegmentUpdateStatusManager updateStatusManager =
           new SegmentUpdateStatusManager(absoluteTableIdentifier);
@@ -978,4 +974,24 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return new String[] { "0" };
   }
 
+  public static void setDatabaseName(Configuration configuration, String databaseName) {
+    if (null != databaseName) {
+      configuration.set(DATABASE_NAME, databaseName);
+    }
+  }
+
+  public static String getDatabaseName(Configuration configuration) {
+    return configuration.get(DATABASE_NAME);
+  }
+
+  public static void setTableName(Configuration configuration, String tableName) {
+    if (null != tableName) {
+      configuration.set(TABLE_NAME, tableName);
+    }
+  }
+
+  public static String getTableName(Configuration configuration) {
+    return configuration.get(TABLE_NAME);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index f3963ad..6e840e2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
@@ -88,7 +89,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -98,7 +98,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.util.StringUtils;
 
 /**
  * Input format of CarbonData file.
@@ -123,6 +122,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
   private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
   private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
+  public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
+  public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
 
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
@@ -288,12 +289,13 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
 
   private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
       throws IOException {
-    String dirs = configuration.get(INPUT_DIR, "");
-    String[] inputPaths = StringUtils.split(dirs);
-    if (inputPaths.length == 0) {
-      throw new InvalidPathException("No input paths specified in job");
+    String tablePath = configuration.get(INPUT_DIR, "");
+    try {
+      return AbsoluteTableIdentifier
+          .from(tablePath, getDatabaseName(configuration), getTableName(configuration));
+    } catch (InvalidConfigurationException e) {
+      throw new IOException(e);
     }
-    return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
   }
 
   /**
@@ -941,4 +943,34 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     }
     return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
   }
+
+  public static void setDatabaseName(Configuration configuration, String databaseName) {
+    if (null != databaseName) {
+      configuration.set(DATABASE_NAME, databaseName);
+    }
+  }
+
+  public static String getDatabaseName(Configuration configuration)
+      throws InvalidConfigurationException {
+    String databseName = configuration.get(DATABASE_NAME);
+    if (null == databseName) {
+      throw new InvalidConfigurationException("Database name is not set.");
+    }
+    return databseName;
+  }
+
+  public static void setTableName(Configuration configuration, String tableName) {
+    if (null != tableName) {
+      configuration.set(TABLE_NAME, tableName);
+    }
+  }
+
+  public static String getTableName(Configuration configuration)
+      throws InvalidConfigurationException {
+    String tableName = configuration.get(TABLE_NAME);
+    if (tableName == null) {
+      throw new InvalidConfigurationException("Table name is not set");
+    }
+    return tableName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
index 66a06ba..1b875bc 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -99,8 +99,6 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
       public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
           throws IOException, InterruptedException {
         DataMapDistributable distributable = (DataMapDistributable)inputSplit;
-        AbsoluteTableIdentifier identifier =
-            AbsoluteTableIdentifier.fromTablePath(distributable.getTablePath());
         TableDataMap dataMap = DataMapStoreManager.getInstance()
             .getDataMap(identifier, distributable.getDataMapName(),
                 distributable.getDataMapFactoryClass());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
index 4268ee2..37796db 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
@@ -98,7 +98,7 @@ class InMemoryBTreeIndex implements Index {
   private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(
       JobContext job, AbsoluteTableIdentifier identifier) throws IOException {
     Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
-    CacheClient cacheClient = new CacheClient(identifier.getStorePath());
+    CacheClient cacheClient = new CacheClient();
     TableSegmentUniqueIdentifier segmentUniqueIdentifier =
         new TableSegmentUniqueIdentifier(identifier, segment.getId());
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
index 32d879f..f4927dd 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
@@ -61,10 +61,10 @@ public class DictionaryDecodeReadSupport<T> implements CarbonReadSupport<T> {
           .hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumns[i].isComplex()) {
         CacheProvider cacheProvider = CacheProvider.getInstance();
         Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
-            .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
+            .createCache(CacheType.FORWARD_DICTIONARY);
         dataTypes[i] = carbonColumns[i].getDataType();
         dictionaries[i] = forwardDictionaryCache.get(new DictionaryColumnUniqueIdentifier(
-            absoluteTableIdentifier.getCarbonTableIdentifier(),
+            absoluteTableIdentifier,
             carbonColumns[i].getColumnIdentifier(), dataTypes[i],
             CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)));
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
index eb07f7e..e6feb93 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
@@ -96,7 +96,7 @@ public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> {
         boolean isDirectDictionary =
             CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY);
         DictionaryColumnUniqueIdentifier dictionarIdentifier =
-            new DictionaryColumnUniqueIdentifier(carbontable.getCarbonTableIdentifier(),
+            new DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(),
                 child.getColumnIdentifier(), child.getDataType(),
                 CarbonStorePath.getCarbonTablePath(carbontable.getAbsoluteTableIdentifier()));
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
index 4e3e6cf..a22461d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -279,7 +279,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
         fileSplit.getStart() == 0);
 
     cacheProvider = CacheProvider.getInstance();
-    cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, carbonTable.getStorePath());
+    cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
     queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
 
     outputSchema = new StructType(CarbonTypeUtil.convertCarbonSchemaToSparkSchema(projection));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index b4444be..630828a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -82,6 +82,10 @@ public class CarbonInputFormatUtil {
       AbsoluteTableIdentifier identifier,
       Job job) throws IOException {
     CarbonTableInputFormat<V> carbonInputFormat = new CarbonTableInputFormat<>();
+    carbonInputFormat.setDatabaseName(job.getConfiguration(),
+        identifier.getCarbonTableIdentifier().getDatabaseName());
+    carbonInputFormat
+        .setTableName(job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
     FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
     return carbonInputFormat;
   }
@@ -90,6 +94,10 @@ public class CarbonInputFormatUtil {
       AbsoluteTableIdentifier identifier, List<String> partitionId, Job job) throws IOException {
     CarbonTableInputFormat<V> carbonTableInputFormat = new CarbonTableInputFormat<>();
     carbonTableInputFormat.setPartitionIdList(job.getConfiguration(), partitionId);
+    carbonTableInputFormat.setDatabaseName(job.getConfiguration(),
+        identifier.getCarbonTableIdentifier().getDatabaseName());
+    carbonTableInputFormat
+        .setTableName(job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
     FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
     return carbonTableInputFormat;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index f6c9e59..f1ce324 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -61,7 +61,7 @@ public class SchemaReader {
       TableInfo wrapperTableInfo = schemaConverter
           .fromExternalToWrapperTableInfo(tableInfo,
               identifier.getCarbonTableIdentifier().getDatabaseName(), tableName,
-              identifier.getStorePath());
+              identifier.getTablePath());
       wrapperTableInfo.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath));
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
       return CarbonMetadata.getInstance().getCarbonTable(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
index 865dabe..068d8b3 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
@@ -189,6 +189,10 @@ public class CarbonInputMapperTest extends TestCase {
     if (filter != null) {
       CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);
     }
+    CarbonInputFormat.setDatabaseName(job.getConfiguration(),
+        abs.getCarbonTableIdentifier().getDatabaseName());
+    CarbonInputFormat.setTableName(job.getConfiguration(),
+        abs.getCarbonTableIdentifier().getTableName());
     FileInputFormat.addInputPath(job, new Path(abs.getTablePath()));
     CarbonUtil.deleteFoldersAndFiles(new File(outPath + "1"));
     FileOutputFormat.setOutputPath(job, new Path(outPath + "1"));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 3b5d736..b4145ef 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -99,14 +99,15 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 public class StoreCreator {
 
   private static AbsoluteTableIdentifier absoluteTableIdentifier;
+  private static String storePath = null;
 
   static {
     try {
-      String storePath = new File("target/store").getCanonicalPath();
+      storePath = new File("target/store").getCanonicalPath();
       String dbName = "testdb";
       String tableName = "testtable";
       absoluteTableIdentifier =
-          new AbsoluteTableIdentifier(storePath, new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+          new AbsoluteTableIdentifier(storePath +"/testdb/testtable", new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
     } catch (IOException ex) {
 
     }
@@ -126,7 +127,7 @@ public class StoreCreator {
     loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
     loadModel.setFactFilePath(factFilePath);
     loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
-    loadModel.setStorePath(absoluteTableIdentifier.getStorePath());
+    loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
     loadModel.setDateFormat(null);
     loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
         CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -163,17 +164,17 @@ public class StoreCreator {
     try {
       String factFilePath =
           new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
-      File storeDir = new File(absoluteTableIdentifier.getStorePath());
+      File storeDir = new File(storePath);
       CarbonUtil.deleteFoldersAndFiles(storeDir);
       CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
-          absoluteTableIdentifier.getStorePath());
+          storePath);
 
       CarbonTable table = createTable(absoluteTableIdentifier);
       writeDictionary(factFilePath, table);
       CarbonLoadModel loadModel =
           buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier);
 
-      executeGraph(loadModel, absoluteTableIdentifier.getStorePath());
+      executeGraph(loadModel, storePath);
 
     } catch (Exception e) {
       e.printStackTrace();
@@ -183,7 +184,7 @@ public class StoreCreator {
   public static CarbonTable createTable(
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
     TableInfo tableInfo = new TableInfo();
-    tableInfo.setStorePath(absoluteTableIdentifier.getStorePath());
+    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
     tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
     TableSchema tableSchema = new TableSchema();
     tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
@@ -276,8 +277,7 @@ public class StoreCreator {
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setFactTable(tableSchema);
     CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
+        .getCarbonTablePath(absoluteTableIdentifier);
     String schemaFilePath = carbonTablePath.getSchemaFilePath();
     String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
     tableInfo.setMetaDataFilepath(schemaMetadataPath);
@@ -327,34 +327,33 @@ public class StoreCreator {
     }
 
     Cache dictCache = CacheProvider.getInstance()
-        .createCache(CacheType.REVERSE_DICTIONARY, absoluteTableIdentifier.getStorePath());
+        .createCache(CacheType.REVERSE_DICTIONARY);
     for (int i = 0; i < set.length; i++) {
       ColumnIdentifier columnIdentifier = new ColumnIdentifier(dims.get(i).getColumnId(), null, null);
       DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
-          new DictionaryColumnUniqueIdentifier(table.getCarbonTableIdentifier(), columnIdentifier,
+          new DictionaryColumnUniqueIdentifier(table.getAbsoluteTableIdentifier(), columnIdentifier,
               columnIdentifier.getDataType(), CarbonStorePath
-              .getCarbonTablePath(table.getStorePath(), table.getCarbonTableIdentifier()));
+              .getCarbonTablePath(table.getAbsoluteTableIdentifier().getTablePath(),
+                  table.getCarbonTableIdentifier()));
       CarbonDictionaryWriter writer =
-          new CarbonDictionaryWriterImpl(absoluteTableIdentifier.getStorePath(),
-              absoluteTableIdentifier.getCarbonTableIdentifier(), dictionaryColumnUniqueIdentifier);
+          new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
       for (String value : set[i]) {
         writer.write(value);
       }
       writer.close();
       writer.commit();
       Dictionary dict = (Dictionary) dictCache.get(
-          new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(),
+          new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
         		  columnIdentifier, dims.get(i).getDataType(),CarbonStorePath
-              .getCarbonTablePath(table.getStorePath(), table.getCarbonTableIdentifier())));
+              .getCarbonTablePath(table.getAbsoluteTableIdentifier().getTablePath(),
+                  table.getCarbonTableIdentifier())));
       CarbonDictionarySortInfoPreparator preparator =
           new CarbonDictionarySortInfoPreparator();
       List<String> newDistinctValues = new ArrayList<String>();
       CarbonDictionarySortInfo dictionarySortInfo =
           preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType());
       CarbonDictionarySortIndexWriter carbonDictionaryWriter =
-          new CarbonDictionarySortIndexWriterImpl(
-              absoluteTableIdentifier.getCarbonTableIdentifier(), dictionaryColumnUniqueIdentifier,
-              absoluteTableIdentifier.getStorePath());
+          new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier);
       try {
         carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex());
         carbonDictionaryWriter.writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted());
@@ -379,7 +378,7 @@ public class StoreCreator {
     String databaseName = loadModel.getDatabaseName();
     String tableName = loadModel.getTableName();
     String tempLocationKey = databaseName + '_' + tableName + "_1";
-    CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation);
+    CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation + "/" + databaseName + "/" + tableName);
     CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
     CarbonProperties.getInstance().addProperty("send.signal.load", "false");
     CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true");
@@ -421,7 +420,7 @@ public class StoreCreator {
 
     CSVRecordReaderIterator readerIterator = new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext);
     new DataLoadExecutor().execute(loadModel,
-        new String[] {storeLocation},
+        new String[] {storeLocation + "/" + databaseName + "/" + tableName},
         new CarbonIterator[]{readerIterator});
 
     writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
index 2a19271..2f770cd 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 
 import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -84,12 +83,11 @@ public class CarbonDictionaryDecodeReadSupport<T> implements CarbonReadSupport<T
           .hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumns[i].isComplex()) {
         CacheProvider cacheProvider = CacheProvider.getInstance();
         Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
-            .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
+            .createCache(CacheType.FORWARD_DICTIONARY);
         dataTypes[i] = carbonColumns[i].getDataType();
         dictionaries[i] = forwardDictionaryCache.get(
-            new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(),
-                carbonColumns[i].getColumnIdentifier(), dataTypes[i],
-                CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)));
+            new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
+                carbonColumns[i].getColumnIdentifier()));
       } else {
         dataTypes[i] = carbonColumns[i].getDataType();
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 4cbc692..aabd3df 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -77,8 +77,8 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
         }
       }
     }
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        AbsoluteTableIdentifier.fromTablePath(validInputPath);
+    AbsoluteTableIdentifier absoluteTableIdentifier = AbsoluteTableIdentifier
+        .from(validInputPath, getDatabaseName(configuration), getTableName(configuration));
     // read the schema file to get the absoluteTableIdentifier having the correct table id
     // persisted in the schema
     CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala b/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
index 9c1d51e..11839c9 100644
--- a/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
+++ b/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
@@ -97,7 +97,7 @@ object HiveExample {
     statement
       .execute(
         "ALTER TABLE HIVE_CARBON_EXAMPLE SET LOCATION " +
-        s"'file:///$store/default/hive_carbon_example' ")
+        s"'file:///$store/hive_carbon_example' ")
 
     val sql = "SELECT * FROM HIVE_CARBON_EXAMPLE"
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index e49dcee..65c7373 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -93,8 +93,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
       Configuration conf = new Configuration();
       conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
       String carbonTablePath = PathFactory.getInstance()
-          .getCarbonTablePath(targetTable.getAbsoluteTableIdentifier().getStorePath(),
-              targetTable.getCarbonTableIdentifier(), null).getPath();
+          .getCarbonTablePath(targetTable.getAbsoluteTableIdentifier(), null).getPath();
 
       conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
       JobConf jobConf = new JobConf(conf);
@@ -123,6 +122,10 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
     try {
       CarbonTableInputFormat
           .setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath()));
+      CarbonTableInputFormat
+          .setDatabaseName(conf, identifier.getCarbonTableIdentifier().getDatabaseName());
+      CarbonTableInputFormat
+          .setTableName(conf, identifier.getCarbonTableIdentifier().getTableName());
     } catch (IOException e) {
       throw new RuntimeException("Unable to create the CarbonTableInputFormat", e);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 9839fc8..8e6abd4 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -293,14 +293,20 @@ public class CarbonTableReader {
       // If table is not previously cached, then:
 
       // Step 1: get store path of the table and cache it.
-      String storePath = config.getStorePath();
       // create table identifier. the table id is randomly generated.
       cache.carbonTableIdentifier =
               new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
                       UUID.randomUUID().toString());
+      String storePath = config.getStorePath();
+      String tablePath = storePath + "/" + cache.carbonTableIdentifier.getDatabaseName() + "/"
+          + cache.carbonTableIdentifier.getTableName();
+
       // get the store path of the table.
+
+      AbsoluteTableIdentifier absoluteTableIdentifier =
+          new AbsoluteTableIdentifier(tablePath, cache.carbonTableIdentifier);
       cache.carbonTablePath =
-              PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier, null);
+          PathFactory.getInstance().getCarbonTablePath(absoluteTableIdentifier, null);
       // cache the table
       cc.put(table, cache);
 
@@ -325,8 +331,8 @@ public class CarbonTableReader {
       SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
       // wrapperTableInfo is the code level information of a table in carbondata core, different from the Thrift TableInfo.
       TableInfo wrapperTableInfo = schemaConverter
-              .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
-                      storePath);
+          .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+              tablePath);
       wrapperTableInfo.setMetaDataFilepath(
               CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
 
@@ -354,9 +360,10 @@ public class CarbonTableReader {
     Configuration config = new Configuration();
     config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
     String carbonTablePath = PathFactory.getInstance()
-            .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier().getStorePath(),
-                    carbonTable.getCarbonTableIdentifier(), null).getPath();
+        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier(), null).getPath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getFactTableName());
 
     try {
       CarbonTableInputFormat.setTableInfo(config, tableInfo);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
index b9a9f0d..c8e74a3 100644
--- a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
+++ b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
@@ -59,11 +59,11 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
         val cacheProvider: CacheProvider = CacheProvider.getInstance
         val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
           cacheProvider
-            .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath)
+            .createCache(CacheType.FORWARD_DICTIONARY)
         dataTypes(index) = carbonColumn.getDataType
         dictionaries(index) = forwardDictionaryCache
-          .get(new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier
-            .getCarbonTableIdentifier, carbonColumn.getColumnIdentifier))
+          .get(new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
+            carbonColumn.getColumnIdentifier))
         dictionarySliceArray(index) = createSliceArrayBlock(dictionaries(index))
 
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index e8f5d6d..17a4188 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -71,16 +71,16 @@ object CarbonDataStoreCreator {
       val dbName: String = "testdb"
       val tableName: String = "testtable"
       val absoluteTableIdentifier = new AbsoluteTableIdentifier(
-        storePath,
+        storePath + "/"+ dbName + "/" + tableName,
         new CarbonTableIdentifier(dbName,
           tableName,
           UUID.randomUUID().toString))
       val factFilePath: String = new File(dataFilePath).getCanonicalPath
-      val storeDir: File = new File(absoluteTableIdentifier.getStorePath)
+      val storeDir: File = new File(absoluteTableIdentifier.getTablePath)
       CarbonUtil.deleteFoldersAndFiles(storeDir)
       CarbonProperties.getInstance.addProperty(
         CarbonCommonConstants.STORE_LOCATION_HDFS,
-        absoluteTableIdentifier.getStorePath)
+        absoluteTableIdentifier.getTablePath)
       val table: CarbonTable = createTable(absoluteTableIdentifier)
       writeDictionary(factFilePath, table, absoluteTableIdentifier)
       val schema: CarbonDataLoadSchema = new CarbonDataLoadSchema(table)
@@ -95,7 +95,7 @@ object CarbonDataStoreCreator {
         absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
       loadModel.setFactFilePath(factFilePath)
       loadModel.setLoadMetadataDetails(new ArrayList[LoadMetadataDetails]())
-      loadModel.setStorePath(absoluteTableIdentifier.getStorePath)
+      loadModel.setTablePath(absoluteTableIdentifier.getTablePath)
       CarbonProperties.getInstance
         .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING, "true")
 
@@ -131,7 +131,7 @@ object CarbonDataStoreCreator {
       loadModel.setPartitionId("0")
       loadModel.setFactTimeStamp(System.currentTimeMillis())
       loadModel.setMaxColumns("15")
-      executeGraph(loadModel, absoluteTableIdentifier.getStorePath)
+      executeGraph(loadModel, storePath)
     } catch {
       case e: Exception => e.printStackTrace()
 
@@ -140,7 +140,7 @@ object CarbonDataStoreCreator {
 
   private def createTable(absoluteTableIdentifier: AbsoluteTableIdentifier): CarbonTable = {
     val tableInfo: TableInfo = new TableInfo()
-    tableInfo.setStorePath(absoluteTableIdentifier.getStorePath)
+    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
     tableInfo.setDatabaseName(
       absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName)
     val tableSchema: TableSchema = new TableSchema()
@@ -293,7 +293,7 @@ object CarbonDataStoreCreator {
     tableInfo.setLastUpdatedTime(System.currentTimeMillis())
     tableInfo.setFactTable(tableSchema)
     val carbonTablePath: CarbonTablePath = CarbonStorePath.getCarbonTablePath(
-      absoluteTableIdentifier.getStorePath,
+      absoluteTableIdentifier.getTablePath,
       absoluteTableIdentifier.getCarbonTableIdentifier)
     val schemaFilePath: String = carbonTablePath.getSchemaFilePath
     val schemaMetadataPath: String =
@@ -351,22 +351,19 @@ object CarbonDataStoreCreator {
       line = reader.readLine()
     }
     val dictCache: Cache[DictionaryColumnUniqueIdentifier, ReverseDictionary] = CacheProvider
-      .getInstance.createCache(CacheType.REVERSE_DICTIONARY,
-      absoluteTableIdentifier.getStorePath)
+      .getInstance.createCache(CacheType.REVERSE_DICTIONARY)
     for (i <- set.indices) {
       val columnIdentifier: ColumnIdentifier =
         new ColumnIdentifier(dims.get(i).getColumnId, null, null)
       val dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier =
         new DictionaryColumnUniqueIdentifier(
-          table.getCarbonTableIdentifier,
+          table.getAbsoluteTableIdentifier,
           columnIdentifier,
           columnIdentifier.getDataType,
-          CarbonStorePath.getCarbonTablePath(table.getStorePath,
+          CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier.getTablePath,
             table.getCarbonTableIdentifier)
         )
       val writer: CarbonDictionaryWriter = new CarbonDictionaryWriterImpl(
-        absoluteTableIdentifier.getStorePath,
-        absoluteTableIdentifier.getCarbonTableIdentifier,
         dictionaryColumnUniqueIdentifier)
       for (value <- set(i)) {
         writer.write(value)
@@ -376,10 +373,10 @@ object CarbonDataStoreCreator {
       val dict: Dictionary = dictCache
         .get(
           new DictionaryColumnUniqueIdentifier(
-            absoluteTableIdentifier.getCarbonTableIdentifier,
+            absoluteTableIdentifier,
             columnIdentifier,
             dims.get(i).getDataType,
-            CarbonStorePath.getCarbonTablePath(table.getStorePath,
+            CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier.getTablePath,
               table.getCarbonTableIdentifier)
           ))
         .asInstanceOf[Dictionary]
@@ -391,10 +388,7 @@ object CarbonDataStoreCreator {
           dict,
           dims.get(i).getDataType)
       val carbonDictionaryWriter: CarbonDictionarySortIndexWriter =
-        new CarbonDictionarySortIndexWriterImpl(
-          absoluteTableIdentifier.getCarbonTableIdentifier,
-          dictionaryColumnUniqueIdentifier,
-          absoluteTableIdentifier.getStorePath)
+        new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier)
       try {
         carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex)
         carbonDictionaryWriter.writeInvertedSortIndex(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
index 8115e27..f49e475 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
@@ -52,7 +52,7 @@ class CarbonV1toV3CompatabilityTestCase extends QueryTest with BeforeAndAfterAll
     localspark.sessionState.asInstanceOf[CarbonSessionState].metadataHive
       .runSqlHive(
         s"ALTER TABLE default.t3 SET SERDEPROPERTIES" +
-        s"('tablePath'='$storeLocation/default/t3')")
+        s"('tablePath'='$storeLocation/default/t3', 'dbname'='default', 'tablename'='t3')")
     localspark.sql("show tables").show()
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index 01146ee..2e26d7f 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -25,9 +25,10 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 
 /**
  * Test Class for AlterTableTestCase to verify all scenerios
@@ -106,12 +107,15 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   private def getIndexFileCount(dbName: String, tableName: String, segment: String): Int = {
-    val identifier = AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sqlContext.sparkSession).storePath, dbName, tableName)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName)
+    val identifier = carbonTable.getAbsoluteTableIdentifier
     val path = CarbonTablePath
       .getSegmentPath(identifier.getTablePath, segment)
     val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonTablePath
-        .INDEX_FILE_EXT)
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonTablePath
+          .INDEX_FILE_EXT)
+      }
     })
     if (carbonFiles != null) {
       carbonFiles.length

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/resources/dblocation/test.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/dblocation/test.csv b/integration/spark-common-test/src/test/resources/dblocation/test.csv
new file mode 100644
index 0000000..1dceb63
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/dblocation/test.csv
@@ -0,0 +1,6 @@
+c1,c2,c3,c5
+a,1,aa,aaa
+b,2,bb,bbb
+c,3,cc,ccc
+d,4,dd,ddd
+e,5,ee,eee

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 23d1292..a34f479 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -49,8 +49,8 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
       datbaseName: String,
       tableName: String): Boolean = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(datbaseName + "_" + tableName)
-    val partitionPath = CarbonStorePath.getCarbonTablePath(storeLocation,
-      carbonTable.getCarbonTableIdentifier).getPartitionDir("0")
+    val partitionPath = CarbonStorePath
+      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
     val fileType: FileFactory.FileType = FileFactory.getFileType(partitionPath)
     val carbonFile = FileFactory.getCarbonFile(partitionPath, fileType)
     val segments: ArrayBuffer[String] = ArrayBuffer()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 34981a2..ea1bbfd 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -24,6 +24,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.path.CarbonStorePath
+
 class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   var timeStampPropOrig: String = _
   override def beforeAll {
@@ -224,7 +227,10 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("insert overwrite table CarbonOverwrite select * from THive")
     sql("insert overwrite table HiveOverwrite select * from THive")
     checkAnswer(sql("select count(*) from CarbonOverwrite"), sql("select count(*) from HiveOverwrite"))
-    val folder = new File(s"$storeLocation/default/carbonoverwrite/Fact/Part0/")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default" + "_" +"carbonoverwrite")
+    val partitionPath = CarbonStorePath
+      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
+    val folder = new File(partitionPath)
     assert(folder.isDirectory)
     assert(folder.list().length == 1)
   }
@@ -244,7 +250,11 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
     sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' overwrite INTO TABLE HiveOverwrite")
     checkAnswer(sql("select count(*) from TCarbonSourceOverwrite"), sql("select count(*) from HiveOverwrite"))
-    val folder = new File(s"$storeLocation/default/tcarbonsourceoverwrite/Fact/Part0/")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default" + "_" +"tcarbonsourceoverwrite")
+    val partitionPath = CarbonStorePath
+      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
+    val folder = new File(partitionPath)
+
     assert(folder.isDirectory)
     assert(folder.list().length == 1)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
index c7b39ad..3873b0d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -25,6 +25,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
 
 class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -527,7 +528,9 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf
   }
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
-    val store = storeLocation + "/default/" + tableName + "/Fact/Part0/Segment_" + segmentNo
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default" + "_" + tableName)
+    val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" +
+                segmentNo
     new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
index 78c835a..1de5c73 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
@@ -28,6 +28,9 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.path.CarbonStorePath
+
 class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
   val filePath: String = s"$resourcesPath/globalsort"
   val file1: String = resourcesPath + "/globalsort/sample1.csv"
@@ -528,7 +531,9 @@ class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndA
   }
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
-    val store = storeLocation + "/default/" + tableName + "/Fact/Part0/Segment_" + segmentNo
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default" + "_" + tableName)
+    val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" +
+                segmentNo
     new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
index 29f3492..508ca6c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -42,14 +42,12 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
           CarbonCommonConstants.DATABASE_DEFAULT_NAME, "compactionlocktesttable", "1")
       )
   val carbonTablePath: CarbonTablePath = CarbonStorePath
-    .getCarbonTablePath(absoluteTableIdentifier.getStorePath,
-      absoluteTableIdentifier.getCarbonTableIdentifier
-    )
+    .getCarbonTablePath(absoluteTableIdentifier)
   val dataPath: String = carbonTablePath.getMetadataDirectoryPath
 
   val carbonLock: ICarbonLock =
     CarbonLockFactory
-      .getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier, LockUsage.COMPACTION_LOCK)
+      .getCarbonLockObj(absoluteTableIdentifier, LockUsage.COMPACTION_LOCK)
 
   override def beforeAll {
     CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index 111ede7..9bf916e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -20,7 +20,7 @@ import scala.collection.JavaConverters._
 
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier
 import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper
 import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -76,12 +76,13 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
   test("delete merged folder and check segments") {
     // delete merged segments
     sql("clean files for table ignoremajor")
-    val identifier = new AbsoluteTableIdentifier(
-          CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(
-            CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
-        )
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+
+    val carbonTable = CarbonMetadata.getInstance()
+      .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "ignoremajor")
+    val absoluteTableIdentifier = carbonTable
+      .getAbsoluteTableIdentifier
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
+      absoluteTableIdentifier)
 
     // merged segment should not be there
     val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
@@ -89,9 +90,8 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     assert(segments.contains("2.1"))
     assert(!segments.contains("2"))
     assert(!segments.contains("3"))
-    val cacheClient = new CacheClient(CarbonProperties.getInstance.
-      getProperty(CarbonCommonConstants.STORE_LOCATION));
-    val segmentIdentifier = new TableSegmentUniqueIdentifier(identifier, "2")
+    val cacheClient = new CacheClient();
+    val segmentIdentifier = new TableSegmentUniqueIdentifier(absoluteTableIdentifier, "2")
     val wrapper: SegmentTaskIndexWrapper = cacheClient.getSegmentAccessClient.
       getIfPresent(segmentIdentifier)
     assert(null == wrapper)
@@ -109,12 +109,12 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     catch {
       case _:Throwable => assert(true)
     }
-    val carbontablePath = CarbonStorePath
-      .getCarbonTablePath(CarbonProperties.getInstance
-        .getProperty(CarbonCommonConstants.STORE_LOCATION),
-        new CarbonTableIdentifier(
-          CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
-      )
+
+    val carbonTable = CarbonMetadata.getInstance()
+      .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "ignoremajor")
+    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+
+    val carbontablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
       .getMetadataDirectoryPath
     val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
 
@@ -130,13 +130,11 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
       "delete from table ignoremajor where segment.starttime before " +
         " '2222-01-01 19:35:01'"
     )
+    val carbonTable = CarbonMetadata.getInstance()
+      .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "ignoremajor")
+    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val carbontablePath = CarbonStorePath
-      .getCarbonTablePath(CarbonProperties.getInstance
-        .getProperty(CarbonCommonConstants.STORE_LOCATION),
-        new CarbonTableIdentifier(
-          CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
-      )
-      .getMetadataDirectoryPath
+      .getCarbonTablePath(absoluteTableIdentifier).getMetadataDirectoryPath
     val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
 
     // status should remain as compacted for segment 2.
@@ -171,12 +169,12 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
         "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
     )
     sql("alter table testmajor compact 'major'")
-    val identifier = new AbsoluteTableIdentifier(
-      CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-      new CarbonTableIdentifier(
-        CarbonCommonConstants.DATABASE_DEFAULT_NAME, "testmajor", "ttt")
-    )
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+
+    val carbonTable = CarbonMetadata.getInstance()
+      .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "testmajor")
+    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
+      absoluteTableIdentifier)
 
     // merged segment should not be there
     val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index 4976c24..02560d9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -19,7 +19,8 @@ package org.apache.carbondata.spark.testsuite.datacompaction
 import scala.collection.JavaConverters._
 
 import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
@@ -77,14 +78,12 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
     var status = false
     var noOfRetries = 0
     while (!status && noOfRetries < 10) {
+      val carbonTable = CarbonMetadata.getInstance()
+        .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "stopmajor")
+      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
 
-      val identifier = new AbsoluteTableIdentifier(
-            CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-            new CarbonTableIdentifier(
-              CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", noOfRetries + "")
-          )
-
-      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
+        absoluteTableIdentifier)
 
       val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
 //      segments.foreach(seg =>
@@ -111,12 +110,12 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
     // delete merged segments
     sql("clean files for table stopmajor")
 
-    val identifier = new AbsoluteTableIdentifier(
-          CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", "rrr")
-        )
+    val carbonTable = CarbonMetadata.getInstance()
+      .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "stopmajor")
+    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
 
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
+      absoluteTableIdentifier)
 
     // merged segment should not be there
     val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index cb3da40..ae25894 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -27,6 +27,8 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.path.CarbonStorePath
 
 class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
   var filePath: String = _
@@ -186,8 +188,12 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
   }
 
   def getIndexfileCount(tableName: String, segmentNo: String = "0"): Int = {
-    val store  = storeLocation +"/default/"+ tableName + "/Fact/Part0/Segment_"+segmentNo
-    new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
+    val carbonTable = CarbonMetadata.getInstance()
+      .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + tableName)
+    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo)
+    new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
   }
 
   override def afterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index c4152a1..44bb2dd 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -22,10 +22,12 @@ import java.io.{File, FilenameFilter}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.reader.CarbonIndexFileReader
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.metadata.CarbonMetadata
+
 class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
   var originVersion = ""
 
@@ -45,7 +47,10 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
     val testData = s"$resourcesPath/sample.csv"
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table test_table_v3")
     val indexReader = new CarbonIndexFileReader()
-    val carbonIndexPaths = new File(s"$storeLocation/default/test_table_v3/Fact/Part0/Segment_0/")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_test_table_v3")
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", "0")
+    val carbonIndexPaths = new File(segmentDir)
       .listFiles(new FilenameFilter {
         override def accept(dir: File, name: String): Boolean = {
           name.endsWith(CarbonTablePath.getCarbonIndexExtension)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 5d0c055..9f941f0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -29,6 +29,9 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.path.CarbonStorePath
+
 class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
   var filePath: String = s"$resourcesPath/globalsort"
 
@@ -236,11 +239,13 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
         | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort_update")
-    sql("UPDATE carbon_localsort_update SET (name) = ('bb') WHERE id = 2").show
 
+    sql("UPDATE carbon_localsort_update SET (name) = ('bb') WHERE id = 2").show
+    sql("select * from carbon_localsort_update").show()
     sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+    sql("select * from carbon_globalsort").show()
     sql("UPDATE carbon_globalsort SET (name) = ('bb') WHERE id = 2").show
-
+    sql("select * from carbon_globalsort").show()
     checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(12)))
     checkAnswer(sql("SELECT name FROM carbon_globalsort WHERE id = 2"), Seq(Row("bb")))
     checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name"),
@@ -326,7 +331,9 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
   }
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
-    val store  = storeLocation + "/default/" + tableName + "/Fact/Part0/Segment_" + segmentNo
-    new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default"+"_"+tableName)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo)
+    new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
   }
 }