You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/25 17:37:48 UTC

[11/15] carbondata git commit: [CARBONDATA-1311] Added carbon storelocation to spark warehouse. And extracted storelocation out of metastore

[CARBONDATA-1311] Added carbon storelocation to spark warehouse. And extracted storelocation out of metastore

1. Changed default storelocation to sparkwouse, ,so if user does not provide storelocation then it chooses sparkwarelocation as store location.
2. Changed file metastore and avoid reading all schema files once keep it in memory, instead implemented cache based storage where it reads when request comes.
3. Extracted store location out of metastore and refactored carbonmetastore.

This closes #1176


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c3b26663
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c3b26663
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c3b26663

Branch: refs/heads/master
Commit: c3b266635e5275832792ea000736adf57117bab0
Parents: a3a12f0
Author: Ravindra Pesala <ra...@gmail.com>
Authored: Fri Jul 14 11:12:57 2017 +0530
Committer: Raghunandan S <ca...@gmail.com>
Committed: Tue Jul 25 17:35:42 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 -
 .../core/metadata/AbsoluteTableIdentifier.java  |   5 +-
 .../carbondata/core/util/CarbonProperties.java  |   4 -
 .../apache/carbondata/core/util/CarbonUtil.java |  28 +-
 .../carbondata/spark/load/CarbonLoaderUtil.java |   6 +-
 .../org/apache/carbondata/api/CarbonStore.scala |   3 +-
 .../spark/sql/test/TestQueryExecutor.scala      |   1 -
 .../apache/spark/sql/test/util/QueryTest.scala  |   6 +-
 .../spark/thriftserver/CarbonThriftServer.scala |   4 +-
 .../org/apache/spark/sql/CarbonContext.scala    |  12 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   3 +-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   4 +-
 .../spark/thriftserver/CarbonThriftServer.scala |   4 +-
 .../carbondata/spark/util/CarbonSparkUtil.scala |   3 +-
 .../spark/sql/CarbonDataFrameWriter.scala       |   2 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   4 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  12 +-
 .../org/apache/spark/sql/CarbonSession.scala    |  13 +-
 .../org/apache/spark/sql/CarbonSource.scala     |  54 ++-
 .../execution/CarbonLateDecodeStrategy.scala    |   2 +-
 .../execution/command/AlterTableCommands.scala  |  16 +-
 .../execution/command/CarbonHiveCommands.scala  |   4 +-
 .../sql/execution/command/DDLStrategy.scala     |   3 +-
 .../execution/command/carbonTableSchema.scala   |  35 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    | 383 +++++++++----------
 .../spark/sql/hive/CarbonHiveMetaStore.scala    | 132 +------
 .../apache/spark/sql/hive/CarbonMetaStore.scala |  54 ++-
 .../spark/sql/hive/CarbonSessionState.scala     |   7 +-
 .../sql/test/Spark2TestQueryExecutor.scala      |   2 +-
 .../org/apache/spark/util/AlterTableUtil.scala  |  12 +-
 .../org/apache/spark/util/CleanFiles.scala      |   3 +-
 .../org/apache/spark/util/Compaction.scala      |   3 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   3 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   3 +-
 .../org/apache/spark/util/ShowSegments.scala    |   3 +-
 .../org/apache/spark/util/TableLoader.scala     |   3 +-
 .../apache/spark/util/CarbonCommandSuite.scala  |  27 +-
 .../carbondata/processing/merger/TableMeta.java |   4 +-
 .../util/CarbonDataProcessorUtil.java           |   6 +-
 39 files changed, 410 insertions(+), 467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index e29e356..c42338b 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -71,10 +71,6 @@ public final class CarbonCommonConstants {
   @CarbonProperty
   public static final String SORT_SIZE = "carbon.sort.size";
   /**
-   * default location of the carbon member, hierarchy and fact files
-   */
-  public static final String STORE_LOCATION_DEFAULT_VAL = "../carbon.store";
-  /**
    * CARDINALITY_INCREMENT_DEFAULT_VALUE
    */
   public static final int CARDINALITY_INCREMENT_VALUE_DEFAULT_VAL = 10;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
index 3c39145..22faaf2 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * identifier which will have store path and carbon table identifier
@@ -68,9 +67,9 @@ public class AbsoluteTableIdentifier implements Serializable {
     return carbonTableIdentifier;
   }
 
-  public static AbsoluteTableIdentifier from(String dbName, String tableName) {
+  public static AbsoluteTableIdentifier from(String storePath, String dbName, String tableName) {
     CarbonTableIdentifier identifier = new CarbonTableIdentifier(dbName, tableName, "");
-    return new AbsoluteTableIdentifier(CarbonUtil.getCarbonStorePath(), identifier);
+    return new AbsoluteTableIdentifier(storePath, identifier);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 3cd0f68..d96d522 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -83,10 +83,6 @@ public final class CarbonProperties {
     } catch (IllegalAccessException e) {
       LOGGER.error("Illelagal access to declared field" + e.getMessage());
     }
-    if (null == carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION)) {
-      carbonProperties.setProperty(CarbonCommonConstants.STORE_LOCATION,
-          CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
-    }
 
     validateBlockletSize();
     validateNumCores();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index e9f817a..ac45da1 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -727,15 +727,6 @@ public final class CarbonUtil {
         .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX);
   }
 
-  public static String getCarbonStorePath() {
-    CarbonProperties prop = CarbonProperties.getInstance();
-    if (null == prop) {
-      return null;
-    }
-    return prop.getProperty(CarbonCommonConstants.STORE_LOCATION,
-        CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
-  }
-
   /**
    * This method will check the existence of a file at a given path
    */
@@ -1797,6 +1788,25 @@ public final class CarbonUtil {
   }
 
   /**
+   * Removes schema from properties
+   * @param properties
+   * @return
+   */
+  public static Map<String, String> removeSchemaFromMap(Map<String, String> properties) {
+    Map<String, String> newMap = new HashMap<>();
+    newMap.putAll(properties);
+    String partsNo = newMap.get("carbonSchemaPartsNo");
+    if (partsNo == null) {
+      return newMap;
+    }
+    int no = Integer.parseInt(partsNo);
+    for (int i = 0; i < no; i++) {
+      newMap.remove("carbonSchema" + i);
+    }
+    return newMap;
+  }
+
+  /**
    * This method will read the schema file from a given path
    *
    * @param schemaFilePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 5b603aa..5cef14a 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -213,8 +213,10 @@ public final class CarbonLoaderUtil {
     String tempLocationKey = CarbonDataProcessorUtil
         .getTempStoreLocationKey(databaseName, tableName, loadModel.getTaskNo(), isCompactionFlow);
     // form local store location
-    final String localStoreLocation = CarbonProperties.getInstance()
-        .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
+    final String localStoreLocation = CarbonProperties.getInstance().getProperty(tempLocationKey);
+    if (localStoreLocation == null) {
+      throw new RuntimeException("Store location not set for the key " + tempLocationKey);
+    }
     // submit local folder clean up in another thread so that main thread execution is not blocked
     ExecutorService localFolderDeletionService = Executors.newFixedThreadPool(1);
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 45719fc..dc37360 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -157,8 +157,9 @@ object CarbonStore {
   def isSegmentValid(
       dbName: String,
       tableName: String,
+      storePath: String,
       segmentId: String): Boolean = {
-    val identifier = AbsoluteTableIdentifier.from(dbName, tableName)
+    val identifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
     val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new
         SegmentStatusManager(
           identifier).getValidAndInvalidSegments

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index b76bca3..149e3b1 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -53,7 +53,6 @@ object TestQueryExecutor {
   val INSTANCE = lookupQueryExecutor.newInstance().asInstanceOf[TestQueryExecutorRegister]
   CarbonProperties.getInstance()
     .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
-    .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
     .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords")
   private def lookupQueryExecutor: Class[_] = {
     ServiceLoader.load(classOf[TestQueryExecutorRegister], Utils.getContextOrSparkClassLoader)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
index b4b7602..146df1b 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
@@ -28,6 +28,9 @@ import org.apache.spark.sql.test.TestQueryExecutor
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
 class QueryTest extends PlanTest {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -86,7 +89,8 @@ class QueryTest extends PlanTest {
 
   val sqlContext: SQLContext = TestQueryExecutor.INSTANCE.sqlContext
 
-  val storeLocation = TestQueryExecutor.storeLocation
+  lazy val storeLocation = CarbonProperties.getInstance().
+    getProperty(CarbonCommonConstants.STORE_LOCATION)
   val resourcesPath = TestQueryExecutor.resourcesPath
   val integrationPath = TestQueryExecutor.integrationPath
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index b8ba9f7..f8275d1 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -57,8 +57,8 @@ object CarbonThriftServer {
                   "Using default Value and proceeding")
         Thread.sleep(30000)
     }
-
-    val cc = new CarbonContext(sc, args.head)
+    val storePath = if (args.length > 0) args.head else null
+    val cc = new CarbonContext(sc, storePath)
 
     HiveThriftServer2.startWithContext(cc)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index 1aeda95..da4b210 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -44,7 +44,7 @@ class CarbonContext(
 
   def this(sc: SparkContext) = {
     this(sc,
-      new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath,
+      null,
       new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
   }
 
@@ -66,8 +66,14 @@ class CarbonContext(
 
   @transient
   override lazy val catalog = {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+    val carbonProperties = CarbonProperties.getInstance()
+    if (storePath != null) {
+      carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+      // In case if it is in carbon.properties for backward compatible
+    } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) {
+      carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
+        conf.getConfString("spark.sql.warehouse.dir"))
+    }
     new CarbonMetastore(this, storePath, metadataHive, queryId) with OverrideCatalog
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 2fc93e6..71ef6a6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -69,7 +69,8 @@ private[sql] case class CarbonDatasourceHadoopRelation(
       carbonTable.getDatabaseName,
       carbonTable.getFactTableName,
       CarbonSparkUtil.createSparkMeta(carbonTable),
-      new TableMeta(carbonTable.getCarbonTableIdentifier, paths.head, carbonTable),
+      new TableMeta(carbonTable.getCarbonTableIdentifier,
+        paths.head, absIdentifier.getTablePath, carbonTable),
       None
     )(sqlContext)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index e8d3907..7790f59 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -236,7 +236,7 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
                   CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
                   val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
                   metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
-                    carbonTable)
+                    null, carbonTable)
                 }
               }
             })
@@ -281,7 +281,7 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
     tableInfo.setMetaDataFilepath(schemaMetadataPath)
     tableInfo.setStorePath(storePath)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
+    val tableMeta = new TableMeta(carbonTableIdentifier, storePath, null,
       CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName))
 
     val fileType = FileFactory.getFileType(schemaMetadataPath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index aba6891..34ac940 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -53,9 +53,9 @@ object CarbonThriftServer {
       System.setProperty("carbon.properties.filepath", sparkConf.get("carbon.properties.filepath"))
     }
 
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, args.head)
+    val storePath = if (args.length > 0) args.head else null
 
-    val spark = builder.getOrCreateCarbonSession(args.head)
+    val spark = builder.getOrCreateCarbonSession(storePath)
     val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000")
     try {
       Thread.sleep(Integer.parseInt(warmUpTime))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index d1d3015..de7f3fb 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -48,7 +48,8 @@ object CarbonSparkUtil {
   def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = {
     val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val table = CarbonTable.buildFromTableInfo(tableInfo)
-    val meta = new TableMeta(identifier.getCarbonTableIdentifier, identifier.getStorePath, table)
+    val meta = new TableMeta(identifier.getCarbonTableIdentifier,
+      identifier.getStorePath, tablePath, table)
     CarbonRelation(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName,
       CarbonSparkUtil.createSparkMeta(table), meta)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 1054c62..805a421 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -58,7 +58,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
    */
   private def loadTempCSV(options: CarbonOption): Unit = {
     // temporary solution: write to csv file, then load the csv into carbon
-    val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.storePath
+    val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).storePath
     val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
       .append("tempCSV")
       .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 33091aa..43f6a21 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -67,7 +67,7 @@ case class CarbonDictionaryDecoder(
 
   override def doExecute(): RDD[InternalRow] = {
     attachTree(this, "execute") {
-      val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
+      val storePath = CarbonEnv.getInstance(sparkSession).storePath
       val absoluteTableIdentifiers = relations.map { relation =>
         val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
         (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
@@ -121,7 +121,7 @@ case class CarbonDictionaryDecoder(
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
 
-    val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
+    val storePath = CarbonEnv.getInstance(sparkSession).storePath
     val absoluteTableIdentifiers = relations.map { relation =>
       val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
       (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index d19eb39..9d10ea0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -40,6 +40,8 @@ class CarbonEnv {
 
   var carbonSessionInfo: CarbonSessionInfo = _
 
+  var storePath: String = _
+
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   // set readsupport class global so that the executor can get it.
@@ -61,10 +63,14 @@ class CarbonEnv {
       // add session params after adding DefaultCarbonParams
       config.addDefaultCarbonSessionParams()
       carbonMetastore = {
-        val storePath =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
+        val properties = CarbonProperties.getInstance()
+        storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
+        if (storePath == null) {
+          storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
+          properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+        }
         LOGGER.info(s"carbon env initial: $storePath")
-        CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf, storePath)
+        CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
       }
       CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
       initialized = true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index b436891..7390cf3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -65,7 +65,7 @@ object CarbonSession {
 
     def getOrCreateCarbonSession(): SparkSession = {
       getOrCreateCarbonSession(
-        new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath,
+        null,
         new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
     }
 
@@ -145,9 +145,14 @@ object CarbonSession {
           }
           sc
         }
-
-        CarbonProperties.getInstance()
-          .addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+        val carbonProperties = CarbonProperties.getInstance()
+        if (storePath != null) {
+          carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+          // In case if it is in carbon.properties for backward compatible
+        } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) {
+          carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
+            sparkContext.conf.get("spark.sql.warehouse.dir"))
+        }
         session = new CarbonSession(sparkContext)
         options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
         SparkSession.setDefaultSession(session)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 498ea03..bec163b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -25,15 +25,19 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.{CreateTable, TableModel, TableNewProcessor}
+import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DecimalType, StructType}
+import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -146,7 +150,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
 
 
   private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
-      dataSchema: StructType): String = {
+      dataSchema: StructType) = {
 
     val dbName: String = parameters.getOrElse("dbName",
       CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
@@ -158,7 +162,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       } else {
         CarbonEnv.getInstance(sparkSession).carbonMetastore
           .lookupRelation(Option(dbName), tableName)(sparkSession)
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+        CarbonEnv.getInstance(sparkSession).storePath + s"/$dbName/$tableName"
       }
     } catch {
       case ex: NoSuchTableException =>
@@ -168,7 +172,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
           dbName,
           tableName)
         CreateTable(cm, false).run(sparkSession)
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+        getPathForTable(sparkSession, dbName, tableName, parameters)
       case ex: Exception =>
         throw new Exception("do not have dbname and tablename for carbon table", ex)
     }
@@ -195,9 +199,9 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       if (parameters.contains("tablePath")) {
         parameters.get("tablePath").get
       } else {
-        CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .lookupRelation(Option(dbName), tableName)(sparkSession)
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+        val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        relation.tableMeta.tablePath
       }
     } catch {
       case ex: Exception =>
@@ -234,22 +238,46 @@ object CarbonSource {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val storageFormat = tableDesc.storage
     val properties = storageFormat.properties
-    if (metaStore.isReadFromHiveMetaStore && !properties.contains("carbonSchemaPartsNo")) {
+    if (!properties.contains("carbonSchemaPartsNo")) {
       val dbName: String = properties.getOrElse("dbName",
         CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
       val tableName: String = properties.getOrElse("tableName", "").toLowerCase
       val model = createTableInfoFromParams(properties, tableDesc.schema, dbName, tableName)
       val tableInfo: TableInfo = TableNewProcessor(model)
-      val (tablePath, carbonSchemaString) =
-      metaStore.createTableFromThrift(tableInfo, dbName, tableName)(sparkSession)
-      val map = CarbonUtil.convertToMultiStringMap(tableInfo)
+      val tablePath = CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName
+      val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+      schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+      tableInfo.getFactTable.getSchemaEvalution.
+        getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+      val map = if (metaStore.isReadFromHiveMetaStore) {
+        val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+        val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+        val schemaMetadataPath =
+          CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+        tableInfo.setMetaDataFilepath(schemaMetadataPath)
+        tableInfo.setStorePath(tableIdentifier.getStorePath)
+        CarbonUtil.convertToMultiStringMap(tableInfo)
+      } else {
+        metaStore.saveToDisk(tableInfo, tablePath)
+        new java.util.HashMap[String, String]()
+      }
       properties.foreach(e => map.put(e._1, e._2))
       map.put("tablePath", tablePath)
       // updating params
       val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
       tableDesc.copy(storage = updatedFormat)
     } else {
-      tableDesc
+      val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
+      if (!metaStore.isReadFromHiveMetaStore) {
+        // save to disk
+        metaStore.saveToDisk(tableInfo, properties.get("tablePath").get)
+        // remove schema string from map as we don't store carbon schema to hive metastore
+        val map = CarbonUtil.removeSchemaFromMap(properties.asJava)
+        val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
+        tableDesc.copy(storage = updatedFormat)
+      } else {
+        tableDesc
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 1cc6668..33bba8f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -102,7 +102,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       CarbonAliasDecoderRelation(),
       rdd,
       output,
-      CarbonEnv.getInstance(SparkSession.getActiveSession.get).carbonMetastore.storePath,
+      CarbonEnv.getInstance(SparkSession.getActiveSession.get).storePath,
       table.carbonTable.getTableInfo.serialize())
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 0d5a821..17e456d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer
 import scala.language.implicitConversions
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
 import org.apache.spark.util.AlterTableUtil
 
@@ -29,7 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -167,11 +168,12 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
       locks = AlterTableUtil
         .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
             sparkSession)
-      carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
-        .asInstanceOf[CarbonRelation].tableMeta.carbonTable
+      val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+        .asInstanceOf[CarbonRelation].tableMeta
+      carbonTable = tableMeta.carbonTable
       // get the latest carbon table and check for column existence
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-        carbonTable.getCarbonTableIdentifier)
+      val carbonTablePath = CarbonStorePath.
+        getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath))
       val tableMetadataFile = carbonTablePath.getPath
       val tableInfo: org.apache.carbondata.format.TableInfo =
         metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -196,7 +198,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
           carbonTable.getCarbonTableIdentifier,
           tableInfo,
           schemaEvolutionEntry,
-          carbonTable.getStorePath)(sparkSession)
+          tableMeta.tablePath)(sparkSession)
       metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
       sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
         .runSqlHive(
@@ -206,6 +208,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
           s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
           s"('tableName'='$newTableName', " +
           s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+      sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
+        Some(oldDatabaseName)).quotedString)
       LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
       LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index 609f39b..2731104 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -41,8 +41,8 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
         CarbonDropTableCommand(true, tableName.database, tableName.table).run(sparkSession)
       }
     }
-    CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase, CarbonEnv.getInstance(sparkSession)
-      .carbonMetastore.storePath)
+    CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase,
+      CarbonEnv.getInstance(sparkSession).storePath)
     rows
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 760cb06..18d2dc7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -62,8 +62,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
       _, child: LogicalPlan, _, _) =>
         ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil
       case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
-        CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).
-          carbonMetastore.storePath)
+        CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).storePath)
         ExecutedCommandExec(createDb) :: Nil
       case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
         ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 65ef2b2..15bf460 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
@@ -156,7 +156,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
     carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
     carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
     carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-    carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+    carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
 
     var storeLocation = CarbonProperties.getInstance
         .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
@@ -196,7 +196,9 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
   }
 
   override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    CarbonEnv.getInstance(sparkSession).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+    val storePath = CarbonEnv.getInstance(sparkSession).storePath
+    CarbonEnv.getInstance(sparkSession).carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables(storePath)
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession)
     val tbName = cm.tableName
@@ -218,10 +220,11 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
         sys.error(s"Table [$tbName] already exists under database [$dbName]")
       }
     } else {
+      val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
       // Add Database to catalog and persist
       val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      val (tablePath, carbonSchemaString) =
-        catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
+      val tablePath = tableIdentifier.getTablePath
+      val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
       if (createDSTable) {
         try {
           val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
@@ -239,7 +242,7 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
             val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
             // call the drop table to delete the created table.
             CarbonEnv.getInstance(sparkSession).carbonMetastore
-                .dropTable(catalog.storePath, identifier)(sparkSession)
+                .dropTable(tablePath, identifier)(sparkSession)
 
             LOGGER.audit(s"Table creation with Database name [$dbName] " +
                 s"and Table name [$tbName] failed")
@@ -332,9 +335,7 @@ object LoadTable {
       tableInfo, entry, carbonTablePath.getPath)(sparkSession)
 
     // update the schema modified time
-    metastore.updateAndTouchSchemasUpdatedTime(
-      carbonLoadModel.getDatabaseName,
-      carbonLoadModel.getTableName)
+    metastore.updateAndTouchSchemasUpdatedTime(model.hdfsLocation)
 
     // update CarbonDataLoadSchema
     val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName),
@@ -526,7 +527,7 @@ case class LoadTable(
       val carbonLoadModel = new CarbonLoadModel()
       carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
       carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-      carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+      carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
 
       val table = relation.tableMeta.carbonTable
       carbonLoadModel.setTableName(table.getFactTableName)
@@ -882,7 +883,10 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
     val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    catalog.checkSchemasModifiedTimeAndReloadTables()
+    val tableIdentifier =
+      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath,
+        dbName.toLowerCase, tableName.toLowerCase)
+    catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath)
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
       locksToBeAcquired foreach {
@@ -891,7 +895,7 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
 
       CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .dropTable(catalog.storePath, identifier)(sparkSession)
+          .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
     } catch {
       case ex: Exception =>
@@ -910,10 +914,11 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // delete the table folder
     val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
     val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val metadataFilePath = CarbonStorePath
-        .getCarbonTablePath(catalog.storePath, carbonTableIdentifier).getMetadataDirectoryPath
+    val tableIdentifier =
+      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName)
+    val metadataFilePath =
+      CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath
     val fileType = FileFactory.getFileType(metadataFilePath)
     if (FileFactory.isFileExist(metadataFilePath, fileType)) {
       val file = FileFactory.getCarbonFile(metadataFilePath, fileType)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 549841b..2407054 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -22,22 +22,23 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
 import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema
+import org.apache.carbondata.core.metadata.schema.table
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -62,7 +63,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   }
 }
 
-class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore {
+class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
 
   @transient
   val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -77,7 +78,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     System.nanoTime() + ""
   }
 
-  lazy val metadata = loadMetadata(storePath, nextQueryId)
+  val metadata = MetaData(new ArrayBuffer[TableMeta]())
 
 
   /**
@@ -90,9 +91,22 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
   override def createCarbonRelation(parameters: Map[String, String],
       absIdentifier: AbsoluteTableIdentifier,
       sparkSession: SparkSession): CarbonRelation = {
-    lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName,
-      Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession)
-      .asInstanceOf[CarbonRelation]
+    val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName
+    val tableName = absIdentifier.getCarbonTableIdentifier.getTableName
+    val tables = getTableFromMetadataCache(database, tableName)
+    tables match {
+      case Some(t) =>
+        CarbonRelation(database, tableName,
+          CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
+      case None =>
+        readCarbonSchema(absIdentifier) match {
+          case Some(meta) =>
+            CarbonRelation(database, tableName,
+              CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
+          case None =>
+            throw new NoSuchTableException(database, tableName)
+        }
+    }
   }
 
   def lookupRelation(dbName: Option[String], tableName: String)
@@ -100,20 +114,21 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
   }
 
-  def lookupRelation(tableIdentifier: TableIdentifier)
+  override def lookupRelation(tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): LogicalPlan = {
-    checkSchemasModifiedTimeAndReloadTables()
     val database = tableIdentifier.database.getOrElse(
-      sparkSession.catalog.currentDatabase
-    )
-    val tables = getTableFromMetadata(database, tableIdentifier.table, true)
-    tables match {
-      case Some(t) =>
-        CarbonRelation(database, tableIdentifier.table,
-          CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head)
-      case None =>
-        throw new NoSuchTableException(database, tableIdentifier.table)
+      sparkSession.catalog.currentDatabase)
+    val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
+      case SubqueryAlias(_,
+      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
+      _) =>
+        carbonDatasourceHadoopRelation.carbonRelation
+      case LogicalRelation(
+      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+        carbonDatasourceHadoopRelation.carbonRelation
+      case _ => throw new NoSuchTableException(database, tableIdentifier.table)
     }
+    relation
   }
 
   /**
@@ -123,8 +138,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * @param tableName
    * @return
    */
-  def getTableFromMetadata(database: String,
-      tableName: String, readStore: Boolean = false): Option[TableMeta] = {
+  def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = {
     metadata.tablesMeta
       .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
                  c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
@@ -136,99 +150,48 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    tableExists(TableIdentifier(table, databaseOp))(sparkSession)
   }
 
-  def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
-    checkSchemasModifiedTimeAndReloadTables()
-    val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val tables = metadata.tablesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
-    tables.nonEmpty
-  }
-
-  def loadMetadata(metadataPath: String, queryId: String): MetaData = {
-    val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
-    val statistic = new QueryStatistic()
-    // creating zookeeper instance once.
-    // if zookeeper is configured as carbon lock type.
-    val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
-    if (null != zookeeperurl) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
-    }
-    if (metadataPath == null) {
-      return null
-    }
-    // if no locktype is configured and store type is HDFS set HDFS lock as default
-    if (null == CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
-        FileType.HDFS == FileFactory.getFileType(metadataPath)) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.LOCK_TYPE,
-          CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
-        )
-      LOGGER.info("Default lock type HDFSLOCK is configured")
+  override def tableExists(tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession): Boolean = {
+    try {
+      lookupRelation(tableIdentifier)(sparkSession)
+    } catch {
+      case e: Exception =>
+        return false
     }
-    val fileType = FileFactory.getFileType(metadataPath)
-    val metaDataBuffer = new ArrayBuffer[TableMeta]
-    fillMetaData(metadataPath, fileType, metaDataBuffer)
-    updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
-    statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
-      System.currentTimeMillis())
-    recorder.recordStatisticsForDriver(statistic, queryId)
-    MetaData(metaDataBuffer)
+    true
   }
 
-  private def fillMetaData(basePath: String, fileType: FileType,
-      metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
-    val databasePath = basePath // + "/schemas"
-    try {
-      if (FileFactory.isFileExist(databasePath, fileType)) {
-        val file = FileFactory.getCarbonFile(databasePath, fileType)
-        val databaseFolders = file.listFiles()
-
-        databaseFolders.foreach(databaseFolder => {
-          if (databaseFolder.isDirectory) {
-            val dbName = databaseFolder.getName
-            val tableFolders = databaseFolder.listFiles()
-
-            tableFolders.foreach(tableFolder => {
-              if (tableFolder.isDirectory) {
-                val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
-                  tableFolder.getName, UUID.randomUUID().toString)
-                val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
-                  carbonTableIdentifier)
-                val tableMetadataFile = carbonTablePath.getSchemaFilePath
-
-                if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-                  val tableName = tableFolder.getName
-                  val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
-                  val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
-                  val schemaConverter = new ThriftWrapperSchemaConverterImpl
-                  val wrapperTableInfo = schemaConverter
-                    .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
-                  val schemaFilePath = CarbonStorePath
-                    .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
-                  wrapperTableInfo.setStorePath(storePath)
-                  wrapperTableInfo
-                    .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-                  CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-                  val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-                  metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
-                    carbonTable)
-                }
-              }
-            })
-          }
-        })
-      } else {
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
-      }
-    } catch {
-      case s: java.io.FileNotFoundException =>
-        s.printStackTrace()
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
+  private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
+    val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
+    val tableName = identifier.getCarbonTableIdentifier.getTableName
+    val storePath = identifier.getStorePath
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
+      tableName.toLowerCase(), UUID.randomUUID().toString)
+    val carbonTablePath =
+      CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+    val tableMetadataFile = carbonTablePath.getSchemaFilePath
+    val fileType = FileFactory.getFileType(tableMetadataFile)
+    if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+      val tableUniqueName = dbName + "_" + tableName
+      val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+      val schemaConverter = new ThriftWrapperSchemaConverterImpl
+      val wrapperTableInfo = schemaConverter
+        .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+      val schemaFilePath = CarbonStorePath
+        .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+      wrapperTableInfo.setStorePath(storePath)
+      wrapperTableInfo
+        .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+      val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+      val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
+        identifier.getStorePath,
+        identifier.getTablePath,
+        carbonTable)
+      metadata.tablesMeta += tableMeta
+      Some(tableMeta)
+    } else {
+      None
     }
   }
 
@@ -238,15 +201,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * @param newTableIdentifier
    * @param thriftTableInfo
    * @param schemaEvolutionEntry
-   * @param carbonStorePath
+   * @param tablePath
    * @param sparkSession
    */
   def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
-      carbonStorePath: String)
-    (sparkSession: SparkSession): String = {
+      tablePath: String) (sparkSession: SparkSession): String = {
+    val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
@@ -255,11 +218,19 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
       .fromExternalToWrapperTableInfo(thriftTableInfo,
           newTableIdentifier.getDatabaseName,
           newTableIdentifier.getTableName,
-          carbonStorePath)
-    createSchemaThriftFile(wrapperTableInfo,
+        absoluteTableIdentifier.getStorePath)
+    val identifier =
+      new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
+        newTableIdentifier.getTableName,
+        wrapperTableInfo.getFactTable.getTableId)
+    val path = createSchemaThriftFile(wrapperTableInfo,
       thriftTableInfo,
+      identifier)
+    addTableCache(wrapperTableInfo,
+      AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath,
       newTableIdentifier.getDatabaseName,
-      newTableIdentifier.getTableName)(sparkSession)
+      newTableIdentifier.getTableName))
+    path
   }
 
   /**
@@ -267,25 +238,27 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    *
    * @param carbonTableIdentifier
    * @param thriftTableInfo
-   * @param carbonStorePath
+   * @param tablePath
    * @param sparkSession
    */
   def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      carbonStorePath: String)
-    (sparkSession: SparkSession): String = {
+      tablePath: String)(sparkSession: SparkSession): String = {
+    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val wrapperTableInfo = schemaConverter
       .fromExternalToWrapperTableInfo(thriftTableInfo,
         carbonTableIdentifier.getDatabaseName,
         carbonTableIdentifier.getTableName,
-        carbonStorePath)
+        tableIdentifier.getStorePath)
     val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
-    createSchemaThriftFile(wrapperTableInfo,
+    wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+    val path = createSchemaThriftFile(wrapperTableInfo,
       thriftTableInfo,
-      carbonTableIdentifier.getDatabaseName,
-      carbonTableIdentifier.getTableName)(sparkSession)
+      tableIdentifier.getCarbonTableIdentifier)
+    addTableCache(wrapperTableInfo, tableIdentifier)
+    path
   }
 
 
@@ -296,24 +269,38 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * Load CarbonTable from wrapper tableInfo
    *
    */
-  def createTableFromThrift(
-      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
-      dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = {
-    if (tableExists(tableName, Some(dbName))(sparkSession)) {
-      sys.error(s"Table [$tableName] already exists under Database [$dbName]")
-    }
-    val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
+  def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val dbName = tableInfo.getDatabaseName
+    val tableName = tableInfo.getFactTable.getTableName
     val thriftTableInfo = schemaConverter
       .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
-    thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
-      .add(schemaEvolutionEntry)
-    val carbonTablePath = createSchemaThriftFile(tableInfo,
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    tableInfo.setStorePath(identifier.getStorePath)
+    createSchemaThriftFile(tableInfo,
       thriftTableInfo,
-      dbName,
-      tableName)(sparkSession)
+      identifier.getCarbonTableIdentifier)
     LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
-    (carbonTablePath, "")
+  }
+
+  /**
+   * Generates schema string from TableInfo
+   */
+  override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
+      tablePath: String): String = {
+    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+    val schemaMetadataPath =
+      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+    tableInfo.setMetaDataFilepath(schemaMetadataPath)
+    tableInfo.setStorePath(tableIdentifier.getStorePath)
+    val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+    tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+    removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName)
+    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+    addTableCache(tableInfo, tableIdentifier)
+    CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
   }
 
   /**
@@ -321,23 +308,16 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    *
    * @param tableInfo
    * @param thriftTableInfo
-   * @param dbName
-   * @param tableName
-   * @param sparkSession
    * @return
    */
-  private def createSchemaThriftFile(
-      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
-      thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      dbName: String, tableName: String)
-    (sparkSession: SparkSession): String = {
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
-      tableInfo.getFactTable.getTableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+  private def createSchemaThriftFile(tableInfo: schema.table.TableInfo,
+      thriftTableInfo: TableInfo,
+      carbonTableIdentifier: CarbonTableIdentifier): String = {
+    val carbonTablePath = CarbonStorePath.
+      getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
     tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setStorePath(storePath)
     val fileType = FileFactory.getFileType(schemaMetadataPath)
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
       FileFactory.mkdirs(schemaMetadataPath, fileType)
@@ -346,13 +326,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     thriftWriter.open(FileWriteOperation.OVERWRITE)
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
-    removeTableFromMetadata(dbName, tableName)
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath))
+    carbonTablePath.getPath
+  }
+
+  protected def addTableCache(tableInfo: table.TableInfo,
+      absoluteTableIdentifier: AbsoluteTableIdentifier) = {
+    val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
+    CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
+    removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
-      CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
+    val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath,
+      absoluteTableIdentifier.getTablePath,
+      CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
     metadata.tablesMeta += tableMeta
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-    carbonTablePath.getPath
   }
 
   /**
@@ -362,13 +349,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * @param tableName
    */
   def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
-    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
+    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
     metadataToBeRemoved match {
       case Some(tableMeta) =>
         metadata.tablesMeta -= tableMeta
         CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
       case None =>
-        LOGGER.debug(s"No entry for table $tableName in database $dbName")
+        if (LOGGER.isDebugEnabled) {
+          LOGGER.debug(s"No entry for table $tableName in database $dbName")
+        }
     }
   }
 
@@ -402,23 +391,23 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
 
 
   def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
-    val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val tableName = tableIdentifier.table.toLowerCase
-
-    val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getPath
-
-    val fileType = FileFactory.getFileType(tablePath)
-    FileFactory.isFileExist(tablePath, fileType)
+    try {
+      val tablePath = lookupRelation(tableIdentifier)(sparkSession).
+        asInstanceOf[CarbonRelation].tableMeta.tablePath
+      val fileType = FileFactory.getFileType(tablePath)
+      FileFactory.isFileExist(tablePath, fileType)
+    } catch {
+      case e: Exception =>
+        false
+    }
   }
 
-  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+  def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession) {
     val dbName = tableIdentifier.database.get
     val tableName = tableIdentifier.table
-
-    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
@@ -429,26 +418,18 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     if (FileFactory.isFileExist(metadataFilePath, fileType)) {
       // while drop we should refresh the schema modified time so that if any thing has changed
       // in the other beeline need to update.
-      checkSchemasModifiedTimeAndReloadTables
-
-      val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
-        tableIdentifier.table)
-      metadataToBeRemoved match {
-        case Some(tableMeta) =>
-          metadata.tablesMeta -= tableMeta
-          CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
-          updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-        case None =>
-          LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName")
-      }
+      checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+
+      removeTableFromMetadata(dbName, tableName)
+      updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
       CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
       // discard cached table info in cachedDataSourceTables
       sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
     }
   }
 
-  private def getTimestampFileAndType(databaseName: String, tableName: String) = {
-    val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+  private def getTimestampFileAndType(basePath: String) = {
+    val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
     val timestampFileType = FileFactory.getFileType(timestampFile)
     (timestampFile, timestampFileType)
   }
@@ -462,37 +443,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     tableModifiedTimeStore.put("default", timeStamp)
   }
 
-  def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) {
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName))
+  def updateAndTouchSchemasUpdatedTime(basePath: String) {
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath))
   }
 
-  /**
-   * This method will read the timestamp of empty schema file
-   *
-   * @param databaseName
-   * @param tableName
-   * @return
-   */
-  private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
-    } else {
-      System.currentTimeMillis()
-    }
-  }
 
   /**
    * This method will check and create an empty schema timestamp file
    *
-   * @param databaseName
-   * @param tableName
    * @return
    */
-  private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+  private def touchSchemaFileSystemTime(basePath: String): Long = {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath)
     if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
+      LOGGER.audit(s"Creating timestamp file for $basePath")
       FileFactory.createNewFile(timestampFile, timestampFileType)
     }
     val systemTime = System.currentTimeMillis()
@@ -501,8 +465,9 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     systemTime
   }
 
-  def checkSchemasModifiedTimeAndReloadTables() {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
+  def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
+    val (timestampFile, timestampFileType) =
+      getTimestampFileAndType(storePath)
     if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
       if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
         getLastModifiedTime ==
@@ -513,7 +478,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
   }
 
   private def refreshCache() {
-    metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta
+    metadata.tablesMeta.clear()
   }
 
   override def isReadFromHiveMetaStore: Boolean = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3b26663/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index a8f92ce..c328130 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -16,21 +16,15 @@
  */
 package org.apache.spark.sql.hive
 
-import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.format
@@ -41,12 +35,10 @@ import org.apache.carbondata.spark.util.CarbonSparkUtil
 /**
  * Metastore to store carbonschema in hive
  */
-class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
-  extends CarbonFileMetastore(conf, storePath) {
+class CarbonHiveMetaStore(conf: RuntimeConfig) extends CarbonFileMetastore(conf) {
 
   override def isReadFromHiveMetaStore: Boolean = true
 
-
   /**
    * Create spark session from paramters.
    *
@@ -61,7 +53,7 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
     if (info != null) {
       val table = CarbonTable.buildFromTableInfo(info)
       val meta = new TableMeta(table.getCarbonTableIdentifier,
-        table.getStorePath, table)
+        absIdentifier.getStorePath, absIdentifier.getTablePath, table)
       CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName,
         CarbonSparkUtil.createSparkMeta(table), meta)
     } else {
@@ -69,111 +61,30 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
     }
   }
 
-  override def lookupRelation(tableIdentifier: TableIdentifier)
-    (sparkSession: SparkSession): LogicalPlan = {
-    val database = tableIdentifier.database.getOrElse(
-      sparkSession.catalog.currentDatabase)
-    val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
-      case SubqueryAlias(_,
-      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
-      _) =>
-        carbonDatasourceHadoopRelation.carbonRelation
-      case LogicalRelation(
-      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
-        carbonDatasourceHadoopRelation.carbonRelation
-      case _ => throw new NoSuchTableException(database, tableIdentifier.table)
-    }
-    relation
-  }
-
-  /**
-   * This method will search for a table in the catalog metadata
-   *
-   * @param database
-   * @param tableName
-   * @return
-   */
-  override def getTableFromMetadata(database: String,
-      tableName: String,
-      readStore: Boolean): Option[TableMeta] = {
-    if (!readStore) {
-      None
-    } else {
-      super.getTableFromMetadata(database, tableName, readStore)
-    }
-  }
-
-  override def tableExists(tableIdentifier: TableIdentifier)
-    (sparkSession: SparkSession): Boolean = {
-    try {
-      lookupRelation(tableIdentifier)(sparkSession)
-    } catch {
-      case e: Exception =>
-        return false
-    }
-    true
-  }
-
-  override def loadMetadata(metadataPath: String,
-      queryId: String): MetaData = {
-    MetaData(new ArrayBuffer[TableMeta])
-  }
-
-
-  /**
-   *
-   * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
-   * Load CarbonTable from wrapper tableInfo
-   *
-   */
-  override def createTableFromThrift(tableInfo: TableInfo, dbName: String,
-      tableName: String)(sparkSession: SparkSession): (String, String) = {
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
-      tableInfo.getFactTable.getTableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
-    val schemaMetadataPath =
-      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
-    tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setStorePath(storePath)
-    val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
-    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
-    tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
-    removeTableFromMetadata(dbName, tableName)
-    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    (carbonTablePath.getPath, CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ","))
-  }
-
-  /**
-   * This method will remove the table meta from catalog metadata array
-   *
-   * @param dbName
-   * @param tableName
-   */
-  override def removeTableFromMetadata(dbName: String,
-      tableName: String): Unit = {
-    // do nothing
-  }
 
   override def isTablePathExists(tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): Boolean = {
     tableExists(tableIdentifier)(sparkSession)
   }
 
-  override def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+  override def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): Unit = {
     val dbName = tableIdentifier.database.get
     val tableName = tableIdentifier.table
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
     }
+    checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+    removeTableFromMetadata(dbName, tableName)
     CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
     // discard cached table info in cachedDataSourceTables
     sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
   }
 
-  override def checkSchemasModifiedTimeAndReloadTables(): Unit = {
+  override def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
     // do nothing now
   }
 
@@ -200,23 +111,23 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
    * @param newTableIdentifier
    * @param thriftTableInfo
    * @param schemaEvolutionEntry
-   * @param carbonStorePath
    * @param sparkSession
    */
   override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
-      carbonStorePath: String)
+      tablePath: String)
     (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
     }
     updateHiveMetaStore(newTableIdentifier,
       oldTableIdentifier,
       thriftTableInfo,
-      carbonStorePath,
+      identifier.getStorePath,
       sparkSession,
       schemaConverter)
   }
@@ -232,23 +143,20 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
         newTableIdentifier.getDatabaseName,
         newTableIdentifier.getTableName,
         carbonStorePath)
-    wrapperTableInfo.setStorePath(storePath)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, newTableIdentifier)
+    wrapperTableInfo.setStorePath(carbonStorePath)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier)
     val schemaMetadataPath =
       CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
     wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
     val dbName = oldTableIdentifier.getDatabaseName
     val tableName = oldTableIdentifier.getTableName
-    val carbonUpdatedIdentifier = new CarbonTableIdentifier(dbName, tableName,
-      wrapperTableInfo.getFactTable.getTableId)
     val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
     sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive(
       s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)")
     sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
-    removeTableFromMetadata(wrapperTableInfo.getDatabaseName,
-      wrapperTableInfo.getFactTable.getTableName)
+    removeTableFromMetadata(dbName, tableName)
     CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    CarbonStorePath.getCarbonTablePath(storePath, carbonUpdatedIdentifier).getPath
+    CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath
   }
 
   /**
@@ -256,21 +164,23 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
    *
    * @param carbonTableIdentifier
    * @param thriftTableInfo
-   * @param carbonStorePath
    * @param sparkSession
    */
   override def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
-      carbonStorePath: String)
+      tablePath: String)
     (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
     updateHiveMetaStore(carbonTableIdentifier,
       carbonTableIdentifier,
       thriftTableInfo,
-      carbonStorePath,
+      identifier.getStorePath,
       sparkSession,
       schemaConverter)
   }
+
+
 }