You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/20 02:52:36 UTC

[1/2] carbondata git commit: [CARBONDATA-1311] Added carbon storelocation to spark warehouse. And extracted storelocation out of metastore [Forced Update!]

Repository: carbondata
Updated Branches:
  refs/heads/metadata 34d2870ae -> 9d8abbdf9 (forced update)


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index ab27b4f..bfe1276 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.schema.table
+import org.apache.carbondata.core.metadata.schema
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -50,17 +50,6 @@ trait CarbonMetaStore {
       absIdentifier: AbsoluteTableIdentifier,
       sparkSession: SparkSession): CarbonRelation
 
-  /**
-   * Get table meta
-   * TODO remove it if possible
-   * @param database
-   * @param tableName
-   * @param readStore
-   * @return
-   */
-  def getTableFromMetadata(database: String,
-      tableName: String,
-      readStore: Boolean = false): Option[TableMeta]
 
   def tableExists(
     table: String,
@@ -68,8 +57,6 @@ trait CarbonMetaStore {
 
   def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean
 
-  def loadMetadata(metadataPath: String, queryId: String): MetaData
-
   /**
    * This method will overwrite the existing schema and update it with the given details
    *
@@ -90,23 +77,26 @@ trait CarbonMetaStore {
    *
    * @param carbonTableIdentifier
    * @param thriftTableInfo
-   * @param carbonStorePath
+   * @param tablePath
    * @param sparkSession
    */
   def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      carbonStorePath: String)
+      tablePath: String)
     (sparkSession: SparkSession): String
 
   /**
-   *
-   * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
-   * Load CarbonTable from wrapper tableInfo
-   *
+   * Prepare Thrift Schema from wrapper TableInfo and write to disk
    */
-  def createTableFromThrift(tableInfo: table.TableInfo,
-      dbName: String,
-      tableName: String)(sparkSession: SparkSession): (String, String)
+  def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String)
+
+  /**
+   * Generates schema string to save it in hive metastore
+   * @param tableInfo
+   * @return
+   */
+  def generateTableSchemaString(tableInfo: schema.table.TableInfo,
+      tablePath: String): String
 
   /**
    * This method will remove the table meta from catalog metadata array
@@ -117,25 +107,25 @@ trait CarbonMetaStore {
   def removeTableFromMetadata(dbName: String, tableName: String): Unit
 
   def updateMetadataByThriftTable(schemaFilePath: String,
-      tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit
+      tableInfo: TableInfo, dbName: String, tableName: String, tablePath: String): Unit
 
   def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean
 
-  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+  def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession)
 
-  def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String)
+  def updateAndTouchSchemasUpdatedTime(basePath: String)
 
-  def checkSchemasModifiedTimeAndReloadTables()
+  def checkSchemasModifiedTimeAndReloadTables(storePath: String)
 
   def isReadFromHiveMetaStore : Boolean
 
   def listAllTables(sparkSession: SparkSession): Seq[CarbonTable]
 
-  def storePath: String
-
   def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo
 
+  def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta]
+
 }
 
 /**
@@ -143,12 +133,12 @@ trait CarbonMetaStore {
  */
 object CarbonMetaStoreFactory {
 
-  def createCarbonMetaStore(conf: RuntimeConfig, storePath: String): CarbonMetaStore = {
+  def createCarbonMetaStore(conf: RuntimeConfig): CarbonMetaStore = {
     val readSchemaFromHiveMetaStore = readSchemaFromHive(conf)
     if (readSchemaFromHiveMetaStore) {
-      new CarbonHiveMetaStore(conf, storePath)
+      new CarbonHiveMetaStore(conf)
     } else {
-      new CarbonFileMetastore(conf, storePath)
+      new CarbonFileMetastore(conf)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 4aef118..99a935b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -94,16 +94,17 @@ class CarbonSessionCatalog(
   private def refreshRelationFromCache(name: TableIdentifier,
       alias: Option[String],
       carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): LogicalPlan = {
-    carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTables
+    carbonEnv.carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(sparkSession).storePath)
     carbonEnv.carbonMetastore
-      .getTableFromMetadata(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
+      .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
         carbonDatasourceHadoopRelation.carbonTable.getFactTableName) match {
       case tableMeta: TableMeta =>
         if (tableMeta.carbonTable.getTableLastUpdatedTime !=
             carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime) {
           refreshTable(name)
         }
-      case _ => refreshTable(name)
+      case _ =>
     }
     super.lookupRelation(name, alias)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index e94b6ed..f764882 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -52,7 +52,7 @@ object Spark2TestQueryExecutor {
     .appName("Spark2TestQueryExecutor")
     .enableHiveSupport()
     .config("spark.sql.warehouse.dir", TestQueryExecutor.warehouse)
-    .getOrCreateCarbonSession(TestQueryExecutor.storeLocation, TestQueryExecutor.metastoredb)
+    .getOrCreateCarbonSession(null, TestQueryExecutor.metastoredb)
   spark.sparkContext.setLogLevel("ERROR")
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 01bdc4f..74f4dd0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -136,8 +136,9 @@ object AlterTableUtil {
         carbonTable.getCarbonTableIdentifier,
         thriftTable,
         schemaEvolutionEntry,
-        carbonTable.getStorePath)(sparkSession)
+        carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+    sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
     val schema = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(tableIdentifier)(sparkSession).schema.json
     val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
@@ -206,7 +207,8 @@ object AlterTableUtil {
           .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
                        oldTableIdentifier.table)
         val tableIdentifier = new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)
-        metastore.revertTableSchema(tableIdentifier, tableInfo, storePath)(sparkSession)
+        metastore.revertTableSchema(tableIdentifier,
+          tableInfo, carbonTablePath.getPath)(sparkSession)
         metastore.removeTableFromMetadata(database, newTableName)
       }
     }
@@ -238,7 +240,7 @@ object AlterTableUtil {
       thriftTable.fact_table.table_columns.removeAll(addedSchemas)
       metastore
         .revertTableSchema(carbonTable.getCarbonTableIdentifier,
-          thriftTable, carbonTable.getStorePath)(sparkSession)
+          thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     }
   }
 
@@ -273,7 +275,7 @@ object AlterTableUtil {
       }
       metastore
         .revertTableSchema(carbonTable.getCarbonTableIdentifier,
-          thriftTable, carbonTable.getStorePath)(sparkSession)
+          thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     }
   }
 
@@ -311,7 +313,7 @@ object AlterTableUtil {
       }
       metastore
         .revertTableSchema(carbonTable.getCarbonTableIdentifier,
-          thriftTable, carbonTable.getStorePath)(sparkSession)
+          thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index 645081f..df73641 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -47,7 +47,8 @@ object CleanFiles {
     val storePath = TableAPIUtil.escape(args(0))
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
-    CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+    CarbonEnv.getInstance(spark).carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
     cleanFiles(spark, dbName, tableName, storePath)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index d78fd5f..d00cb84 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -56,7 +56,8 @@ object Compaction {
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val compactionType = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
-    CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+    CarbonEnv.getInstance(spark).carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
     compaction(spark, dbName, tableName, compactionType)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index f67a5ce..4aaec8f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -47,7 +47,8 @@ object DeleteSegmentByDate {
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val dateValue = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
-    CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+    CarbonEnv.getInstance(spark).carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
     deleteSegmentByDate(spark, dbName, tableName, dateValue)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index bbf386e..c86c7f5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -52,7 +52,8 @@ object DeleteSegmentById {
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
-    CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+    CarbonEnv.getInstance(spark).carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
     deleteSegmentById(spark, dbName, tableName, segmentIds)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index d5788ba..19d7dce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -78,7 +78,8 @@ object ShowSegments {
       None
     }
     val spark = TableAPIUtil.spark(storePath, s"ShowSegments: $dbName.$tableName")
-    CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+    CarbonEnv.getInstance(spark).carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
     val rows = showSegments(spark, dbName, tableName, limit)
     System.out.println(showString(rows))
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 82d8da2..8adaf00 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -80,7 +80,8 @@ object TableLoader {
 
     val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
 
-    CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+    CarbonEnv.getInstance(spark).carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
     loadTable(spark, Option(dbName), tableName, inputPaths, map)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index 25be4a0..fc67cdf 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.api.CarbonStore
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 
 class CarbonCommandSuite extends QueryTest with BeforeAndAfterAll {
 
@@ -42,32 +43,34 @@ class CarbonCommandSuite extends QueryTest with BeforeAndAfterAll {
     dropTable("carbon_table")
   }
 
+  private lazy val location =
+    CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
   test("show segment") {
-    ShowSegments.main(Array(s"${CarbonUtil.getCarbonStorePath}", "carbon_table"))
+    ShowSegments.main(Array(s"${location}", "carbon_table"))
   }
 
   test("delete segment by id") {
-    DeleteSegmentById.main(Array(s"${CarbonUtil.getCarbonStorePath}", "carbon_table", "0"))
-    assert(!CarbonStore.isSegmentValid("default", "carbon_table", "0"))
+    DeleteSegmentById.main(Array(s"${location}", "carbon_table", "0"))
+    assert(!CarbonStore.isSegmentValid("default", "carbon_table",location,  "0"))
   }
 
   test("delete segment by date") {
     createAndLoadTestTable("carbon_table2", "csv_table")
     val time = new Timestamp(new Date().getTime)
-    DeleteSegmentByDate.main(Array(s"${CarbonUtil.getCarbonStorePath}", "carbon_table2", time.toString))
-    assert(!CarbonStore.isSegmentValid("default", "carbon_table2", "0"))
+    DeleteSegmentByDate.main(Array(s"${location}", "carbon_table2", time.toString))
+    assert(!CarbonStore.isSegmentValid("default", "carbon_table2", location, "0"))
     dropTable("carbon_table2")
   }
 
   test("clean files") {
     val table = "carbon_table3"
     createAndLoadTestTable(table, "csv_table")
-    ShowSegments.main(Array(s"${CarbonUtil.getCarbonStorePath}", table))
-    DeleteSegmentById.main(Array(s"${CarbonUtil.getCarbonStorePath}", table, "0"))
-    ShowSegments.main(Array(s"${CarbonUtil.getCarbonStorePath}", table))
-    CleanFiles.main(Array(s"${CarbonUtil.getCarbonStorePath}", table))
-    ShowSegments.main(Array(s"${CarbonUtil.getCarbonStorePath}", table))
-    val tablePath = s"${CarbonUtil.getCarbonStorePath}${File.separator}default${File.separator}$table"
+    ShowSegments.main(Array(s"${location}", table))
+    DeleteSegmentById.main(Array(s"${location}", table, "0"))
+    ShowSegments.main(Array(s"${location}", table))
+    CleanFiles.main(Array(s"${location}", table))
+    ShowSegments.main(Array(s"${location}", table))
+    val tablePath = s"${location}${File.separator}default${File.separator}$table"
     val f = new File(s"$tablePath/Fact/Part0")
     assert(f.isDirectory)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java b/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
index b84d695..09dbfff 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
@@ -29,11 +29,13 @@ public class TableMeta implements Serializable {
   public CarbonTableIdentifier carbonTableIdentifier;
   public String storePath;
   public CarbonTable carbonTable;
+  public String tablePath;
 
-  public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath,
+  public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath, String tablePath,
       CarbonTable carbonTable) {
     this.carbonTableIdentifier = carbonTableIdentifier;
     this.storePath = storePath;
+    this.tablePath = tablePath;
     this.carbonTable = carbonTable;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 62f13db..17f4df6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -173,8 +173,10 @@ public final class CarbonDataProcessorUtil {
       String taskId, String partitionId, String segmentId, boolean isCompactionFlow) {
     String tempLocationKey =
         getTempStoreLocationKey(databaseName, tableName, taskId, isCompactionFlow);
-    String baseStorePath = CarbonProperties.getInstance()
-        .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
+    String baseStorePath = CarbonProperties.getInstance().getProperty(tempLocationKey);
+    if (baseStorePath == null) {
+      LOGGER.warn("Location not set for the key " + tempLocationKey);
+    }
     CarbonTable carbonTable = CarbonMetadata.getInstance()
         .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
     CarbonTablePath carbonTablePath =


[2/2] carbondata git commit: [CARBONDATA-1311] Added carbon storelocation to spark warehouse. And extracted storelocation out of metastore

Posted by ja...@apache.org.
[CARBONDATA-1311] Added carbon storelocation to spark warehouse. And extracted storelocation out of metastore

1. Changed default storelocation to sparkwouse, ,so if user does not provide storelocation then it chooses sparkwarelocation as store location.
2. Changed file metastore and avoid reading all schema files once keep it in memory, instead implemented cache based storage where it reads when request comes.
3. Extracted store location out of metastore and refactored carbonmetastore.

This closes #1176


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9d8abbdf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9d8abbdf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9d8abbdf

Branch: refs/heads/metadata
Commit: 9d8abbdf955d0f42ff4572995719efd561bea961
Parents: 8d3c9bf
Author: Ravindra Pesala <ra...@gmail.com>
Authored: Fri Jul 14 11:12:57 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Jul 20 10:52:18 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 -
 .../core/metadata/AbsoluteTableIdentifier.java  |   5 +-
 .../carbondata/core/util/CarbonProperties.java  |   4 -
 .../apache/carbondata/core/util/CarbonUtil.java |  28 +-
 .../spark/sql/common/util/QueryTest.scala       |   6 +-
 .../carbondata/spark/load/CarbonLoaderUtil.java |   6 +-
 .../org/apache/carbondata/api/CarbonStore.scala |   3 +-
 .../spark/sql/test/TestQueryExecutor.scala      |   1 -
 .../spark/thriftserver/CarbonThriftServer.scala |   4 +-
 .../org/apache/spark/sql/CarbonContext.scala    |  12 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   3 +-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   4 +-
 .../spark/thriftserver/CarbonThriftServer.scala |   4 +-
 .../carbondata/spark/util/CarbonSparkUtil.scala |   3 +-
 .../spark/sql/CarbonDataFrameWriter.scala       |   2 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   4 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  12 +-
 .../org/apache/spark/sql/CarbonSession.scala    |  13 +-
 .../org/apache/spark/sql/CarbonSource.scala     |  54 ++-
 .../execution/CarbonLateDecodeStrategy.scala    |   2 +-
 .../execution/command/AlterTableCommands.scala  |  16 +-
 .../execution/command/CarbonHiveCommands.scala  |   4 +-
 .../sql/execution/command/DDLStrategy.scala     |   3 +-
 .../execution/command/carbonTableSchema.scala   |  35 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    | 383 +++++++++----------
 .../spark/sql/hive/CarbonHiveMetaStore.scala    | 132 +------
 .../apache/spark/sql/hive/CarbonMetaStore.scala |  54 ++-
 .../spark/sql/hive/CarbonSessionState.scala     |   7 +-
 .../sql/test/Spark2TestQueryExecutor.scala      |   2 +-
 .../org/apache/spark/util/AlterTableUtil.scala  |  12 +-
 .../org/apache/spark/util/CleanFiles.scala      |   3 +-
 .../org/apache/spark/util/Compaction.scala      |   3 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   3 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   3 +-
 .../org/apache/spark/util/ShowSegments.scala    |   3 +-
 .../org/apache/spark/util/TableLoader.scala     |   3 +-
 .../apache/spark/util/CarbonCommandSuite.scala  |  27 +-
 .../carbondata/processing/merger/TableMeta.java |   4 +-
 .../util/CarbonDataProcessorUtil.java           |   6 +-
 39 files changed, 410 insertions(+), 467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 55a292e..f6e5c62 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -71,10 +71,6 @@ public final class CarbonCommonConstants {
   @CarbonProperty
   public static final String SORT_SIZE = "carbon.sort.size";
   /**
-   * default location of the carbon member, hierarchy and fact files
-   */
-  public static final String STORE_LOCATION_DEFAULT_VAL = "../carbon.store";
-  /**
    * CARDINALITY_INCREMENT_DEFAULT_VALUE
    */
   public static final int CARDINALITY_INCREMENT_VALUE_DEFAULT_VAL = 10;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
index 3c39145..22faaf2 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * identifier which will have store path and carbon table identifier
@@ -68,9 +67,9 @@ public class AbsoluteTableIdentifier implements Serializable {
     return carbonTableIdentifier;
   }
 
-  public static AbsoluteTableIdentifier from(String dbName, String tableName) {
+  public static AbsoluteTableIdentifier from(String storePath, String dbName, String tableName) {
     CarbonTableIdentifier identifier = new CarbonTableIdentifier(dbName, tableName, "");
-    return new AbsoluteTableIdentifier(CarbonUtil.getCarbonStorePath(), identifier);
+    return new AbsoluteTableIdentifier(storePath, identifier);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index d14b7ab..2f5874b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -81,10 +81,6 @@ public final class CarbonProperties {
     } catch (IllegalAccessException e) {
       LOGGER.error("Illelagal access to declared field" + e.getMessage());
     }
-    if (null == carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION)) {
-      carbonProperties.setProperty(CarbonCommonConstants.STORE_LOCATION,
-          CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
-    }
 
     validateBlockletSize();
     validateNumCores();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b9c164a..59f8cd8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -730,15 +730,6 @@ public final class CarbonUtil {
         .startsWith("file://") || lowerPath.startsWith(ALLUXIO_PREFIX);
   }
 
-  public static String getCarbonStorePath() {
-    CarbonProperties prop = CarbonProperties.getInstance();
-    if (null == prop) {
-      return null;
-    }
-    return prop.getProperty(CarbonCommonConstants.STORE_LOCATION,
-        CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
-  }
-
   /**
    * This method will check the existence of a file at a given path
    */
@@ -1800,6 +1791,25 @@ public final class CarbonUtil {
   }
 
   /**
+   * Removes schema from properties
+   * @param properties
+   * @return
+   */
+  public static Map<String, String> removeSchemaFromMap(Map<String, String> properties) {
+    Map<String, String> newMap = new HashMap<>();
+    newMap.putAll(properties);
+    String partsNo = newMap.get("carbonSchemaPartsNo");
+    if (partsNo == null) {
+      return newMap;
+    }
+    int no = Integer.parseInt(partsNo);
+    for (int i = 0; i < no; i++) {
+      newMap.remove("carbonSchema" + i);
+    }
+    return newMap;
+  }
+
+  /**
    * This method will read the schema file from a given path
    *
    * @param schemaFilePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 9912ec4..9926c57 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -27,6 +27,9 @@ import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
 class QueryTest extends PlanTest {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -85,7 +88,8 @@ class QueryTest extends PlanTest {
 
   val sqlContext: SQLContext = TestQueryExecutor.INSTANCE.sqlContext
 
-  val storeLocation = TestQueryExecutor.storeLocation
+  lazy val storeLocation = CarbonProperties.getInstance().
+    getProperty(CarbonCommonConstants.STORE_LOCATION)
   val resourcesPath = TestQueryExecutor.resourcesPath
   val integrationPath = TestQueryExecutor.integrationPath
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 5b603aa..5cef14a 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -213,8 +213,10 @@ public final class CarbonLoaderUtil {
     String tempLocationKey = CarbonDataProcessorUtil
         .getTempStoreLocationKey(databaseName, tableName, loadModel.getTaskNo(), isCompactionFlow);
     // form local store location
-    final String localStoreLocation = CarbonProperties.getInstance()
-        .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
+    final String localStoreLocation = CarbonProperties.getInstance().getProperty(tempLocationKey);
+    if (localStoreLocation == null) {
+      throw new RuntimeException("Store location not set for the key " + tempLocationKey);
+    }
     // submit local folder clean up in another thread so that main thread execution is not blocked
     ExecutorService localFolderDeletionService = Executors.newFixedThreadPool(1);
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 45719fc..dc37360 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -157,8 +157,9 @@ object CarbonStore {
   def isSegmentValid(
       dbName: String,
       tableName: String,
+      storePath: String,
       segmentId: String): Boolean = {
-    val identifier = AbsoluteTableIdentifier.from(dbName, tableName)
+    val identifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
     val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new
         SegmentStatusManager(
           identifier).getValidAndInvalidSegments

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index b76bca3..149e3b1 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -53,7 +53,6 @@ object TestQueryExecutor {
   val INSTANCE = lookupQueryExecutor.newInstance().asInstanceOf[TestQueryExecutorRegister]
   CarbonProperties.getInstance()
     .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
-    .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
     .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords")
   private def lookupQueryExecutor: Class[_] = {
     ServiceLoader.load(classOf[TestQueryExecutorRegister], Utils.getContextOrSparkClassLoader)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index b8ba9f7..f8275d1 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -57,8 +57,8 @@ object CarbonThriftServer {
                   "Using default Value and proceeding")
         Thread.sleep(30000)
     }
-
-    val cc = new CarbonContext(sc, args.head)
+    val storePath = if (args.length > 0) args.head else null
+    val cc = new CarbonContext(sc, storePath)
 
     HiveThriftServer2.startWithContext(cc)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index 1aeda95..da4b210 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -44,7 +44,7 @@ class CarbonContext(
 
   def this(sc: SparkContext) = {
     this(sc,
-      new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath,
+      null,
       new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
   }
 
@@ -66,8 +66,14 @@ class CarbonContext(
 
   @transient
   override lazy val catalog = {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+    val carbonProperties = CarbonProperties.getInstance()
+    if (storePath != null) {
+      carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+      // In case if it is in carbon.properties for backward compatible
+    } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) {
+      carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
+        conf.getConfString("spark.sql.warehouse.dir"))
+    }
     new CarbonMetastore(this, storePath, metadataHive, queryId) with OverrideCatalog
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 2fc93e6..71ef6a6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -69,7 +69,8 @@ private[sql] case class CarbonDatasourceHadoopRelation(
       carbonTable.getDatabaseName,
       carbonTable.getFactTableName,
       CarbonSparkUtil.createSparkMeta(carbonTable),
-      new TableMeta(carbonTable.getCarbonTableIdentifier, paths.head, carbonTable),
+      new TableMeta(carbonTable.getCarbonTableIdentifier,
+        paths.head, absIdentifier.getTablePath, carbonTable),
       None
     )(sqlContext)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index e8d3907..7790f59 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -236,7 +236,7 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
                   CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
                   val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
                   metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
-                    carbonTable)
+                    null, carbonTable)
                 }
               }
             })
@@ -281,7 +281,7 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
     tableInfo.setMetaDataFilepath(schemaMetadataPath)
     tableInfo.setStorePath(storePath)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
+    val tableMeta = new TableMeta(carbonTableIdentifier, storePath, null,
       CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName))
 
     val fileType = FileFactory.getFileType(schemaMetadataPath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index aba6891..34ac940 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -53,9 +53,9 @@ object CarbonThriftServer {
       System.setProperty("carbon.properties.filepath", sparkConf.get("carbon.properties.filepath"))
     }
 
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, args.head)
+    val storePath = if (args.length > 0) args.head else null
 
-    val spark = builder.getOrCreateCarbonSession(args.head)
+    val spark = builder.getOrCreateCarbonSession(storePath)
     val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000")
     try {
       Thread.sleep(Integer.parseInt(warmUpTime))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index d1d3015..de7f3fb 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -48,7 +48,8 @@ object CarbonSparkUtil {
   def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = {
     val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val table = CarbonTable.buildFromTableInfo(tableInfo)
-    val meta = new TableMeta(identifier.getCarbonTableIdentifier, identifier.getStorePath, table)
+    val meta = new TableMeta(identifier.getCarbonTableIdentifier,
+      identifier.getStorePath, tablePath, table)
     CarbonRelation(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName,
       CarbonSparkUtil.createSparkMeta(table), meta)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 1054c62..805a421 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -58,7 +58,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
    */
   private def loadTempCSV(options: CarbonOption): Unit = {
     // temporary solution: write to csv file, then load the csv into carbon
-    val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.storePath
+    val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).storePath
     val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
       .append("tempCSV")
       .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 33091aa..43f6a21 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -67,7 +67,7 @@ case class CarbonDictionaryDecoder(
 
   override def doExecute(): RDD[InternalRow] = {
     attachTree(this, "execute") {
-      val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
+      val storePath = CarbonEnv.getInstance(sparkSession).storePath
       val absoluteTableIdentifiers = relations.map { relation =>
         val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
         (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
@@ -121,7 +121,7 @@ case class CarbonDictionaryDecoder(
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
 
-    val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
+    val storePath = CarbonEnv.getInstance(sparkSession).storePath
     val absoluteTableIdentifiers = relations.map { relation =>
       val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
       (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index d19eb39..9d10ea0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -40,6 +40,8 @@ class CarbonEnv {
 
   var carbonSessionInfo: CarbonSessionInfo = _
 
+  var storePath: String = _
+
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   // set readsupport class global so that the executor can get it.
@@ -61,10 +63,14 @@ class CarbonEnv {
       // add session params after adding DefaultCarbonParams
       config.addDefaultCarbonSessionParams()
       carbonMetastore = {
-        val storePath =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
+        val properties = CarbonProperties.getInstance()
+        storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
+        if (storePath == null) {
+          storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
+          properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+        }
         LOGGER.info(s"carbon env initial: $storePath")
-        CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf, storePath)
+        CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
       }
       CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
       initialized = true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index b436891..7390cf3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -65,7 +65,7 @@ object CarbonSession {
 
     def getOrCreateCarbonSession(): SparkSession = {
       getOrCreateCarbonSession(
-        new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath,
+        null,
         new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
     }
 
@@ -145,9 +145,14 @@ object CarbonSession {
           }
           sc
         }
-
-        CarbonProperties.getInstance()
-          .addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+        val carbonProperties = CarbonProperties.getInstance()
+        if (storePath != null) {
+          carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+          // In case if it is in carbon.properties for backward compatible
+        } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) {
+          carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
+            sparkContext.conf.get("spark.sql.warehouse.dir"))
+        }
         session = new CarbonSession(sparkContext)
         options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
         SparkSession.setDefaultSession(session)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 498ea03..bec163b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -25,15 +25,19 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.{CreateTable, TableModel, TableNewProcessor}
+import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DecimalType, StructType}
+import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -146,7 +150,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
 
 
   private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
-      dataSchema: StructType): String = {
+      dataSchema: StructType) = {
 
     val dbName: String = parameters.getOrElse("dbName",
       CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
@@ -158,7 +162,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       } else {
         CarbonEnv.getInstance(sparkSession).carbonMetastore
           .lookupRelation(Option(dbName), tableName)(sparkSession)
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+        CarbonEnv.getInstance(sparkSession).storePath + s"/$dbName/$tableName"
       }
     } catch {
       case ex: NoSuchTableException =>
@@ -168,7 +172,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
           dbName,
           tableName)
         CreateTable(cm, false).run(sparkSession)
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+        getPathForTable(sparkSession, dbName, tableName, parameters)
       case ex: Exception =>
         throw new Exception("do not have dbname and tablename for carbon table", ex)
     }
@@ -195,9 +199,9 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       if (parameters.contains("tablePath")) {
         parameters.get("tablePath").get
       } else {
-        CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .lookupRelation(Option(dbName), tableName)(sparkSession)
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+        val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        relation.tableMeta.tablePath
       }
     } catch {
       case ex: Exception =>
@@ -234,22 +238,46 @@ object CarbonSource {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val storageFormat = tableDesc.storage
     val properties = storageFormat.properties
-    if (metaStore.isReadFromHiveMetaStore && !properties.contains("carbonSchemaPartsNo")) {
+    if (!properties.contains("carbonSchemaPartsNo")) {
       val dbName: String = properties.getOrElse("dbName",
         CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
       val tableName: String = properties.getOrElse("tableName", "").toLowerCase
       val model = createTableInfoFromParams(properties, tableDesc.schema, dbName, tableName)
       val tableInfo: TableInfo = TableNewProcessor(model)
-      val (tablePath, carbonSchemaString) =
-      metaStore.createTableFromThrift(tableInfo, dbName, tableName)(sparkSession)
-      val map = CarbonUtil.convertToMultiStringMap(tableInfo)
+      val tablePath = CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName
+      val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+      schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+      tableInfo.getFactTable.getSchemaEvalution.
+        getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+      val map = if (metaStore.isReadFromHiveMetaStore) {
+        val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+        val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+        val schemaMetadataPath =
+          CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+        tableInfo.setMetaDataFilepath(schemaMetadataPath)
+        tableInfo.setStorePath(tableIdentifier.getStorePath)
+        CarbonUtil.convertToMultiStringMap(tableInfo)
+      } else {
+        metaStore.saveToDisk(tableInfo, tablePath)
+        new java.util.HashMap[String, String]()
+      }
       properties.foreach(e => map.put(e._1, e._2))
       map.put("tablePath", tablePath)
       // updating params
       val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
       tableDesc.copy(storage = updatedFormat)
     } else {
-      tableDesc
+      val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
+      if (!metaStore.isReadFromHiveMetaStore) {
+        // save to disk
+        metaStore.saveToDisk(tableInfo, properties.get("tablePath").get)
+        // remove schema string from map as we don't store carbon schema to hive metastore
+        val map = CarbonUtil.removeSchemaFromMap(properties.asJava)
+        val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
+        tableDesc.copy(storage = updatedFormat)
+      } else {
+        tableDesc
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 1cc6668..33bba8f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -102,7 +102,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       CarbonAliasDecoderRelation(),
       rdd,
       output,
-      CarbonEnv.getInstance(SparkSession.getActiveSession.get).carbonMetastore.storePath,
+      CarbonEnv.getInstance(SparkSession.getActiveSession.get).storePath,
       table.carbonTable.getTableInfo.serialize())
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 0d5a821..17e456d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer
 import scala.language.implicitConversions
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
 import org.apache.spark.util.AlterTableUtil
 
@@ -29,7 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -167,11 +168,12 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
       locks = AlterTableUtil
         .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
             sparkSession)
-      carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
-        .asInstanceOf[CarbonRelation].tableMeta.carbonTable
+      val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+        .asInstanceOf[CarbonRelation].tableMeta
+      carbonTable = tableMeta.carbonTable
       // get the latest carbon table and check for column existence
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-        carbonTable.getCarbonTableIdentifier)
+      val carbonTablePath = CarbonStorePath.
+        getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath))
       val tableMetadataFile = carbonTablePath.getPath
       val tableInfo: org.apache.carbondata.format.TableInfo =
         metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -196,7 +198,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
           carbonTable.getCarbonTableIdentifier,
           tableInfo,
           schemaEvolutionEntry,
-          carbonTable.getStorePath)(sparkSession)
+          tableMeta.tablePath)(sparkSession)
       metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
       sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
         .runSqlHive(
@@ -206,6 +208,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
           s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
           s"('tableName'='$newTableName', " +
           s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+      sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
+        Some(oldDatabaseName)).quotedString)
       LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
       LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index 609f39b..2731104 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -41,8 +41,8 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
         CarbonDropTableCommand(true, tableName.database, tableName.table).run(sparkSession)
       }
     }
-    CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase, CarbonEnv.getInstance(sparkSession)
-      .carbonMetastore.storePath)
+    CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase,
+      CarbonEnv.getInstance(sparkSession).storePath)
     rows
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 760cb06..18d2dc7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -62,8 +62,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
       _, child: LogicalPlan, _, _) =>
         ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil
       case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
-        CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).
-          carbonMetastore.storePath)
+        CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).storePath)
         ExecutedCommandExec(createDb) :: Nil
       case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
         ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index cc18fa3..eeb2022 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
@@ -156,7 +156,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
     carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
     carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
     carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-    carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+    carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
 
     var storeLocation = CarbonProperties.getInstance
         .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
@@ -196,7 +196,9 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
   }
 
   override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    CarbonEnv.getInstance(sparkSession).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+    val storePath = CarbonEnv.getInstance(sparkSession).storePath
+    CarbonEnv.getInstance(sparkSession).carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables(storePath)
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession)
     val tbName = cm.tableName
@@ -218,10 +220,11 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
         sys.error(s"Table [$tbName] already exists under database [$dbName]")
       }
     } else {
+      val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
       // Add Database to catalog and persist
       val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      val (tablePath, carbonSchemaString) =
-        catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
+      val tablePath = tableIdentifier.getTablePath
+      val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
       if (createDSTable) {
         try {
           val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
@@ -239,7 +242,7 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
             val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
             // call the drop table to delete the created table.
             CarbonEnv.getInstance(sparkSession).carbonMetastore
-                .dropTable(catalog.storePath, identifier)(sparkSession)
+                .dropTable(tablePath, identifier)(sparkSession)
 
             LOGGER.audit(s"Table creation with Database name [$dbName] " +
                 s"and Table name [$tbName] failed")
@@ -332,9 +335,7 @@ object LoadTable {
       tableInfo, entry, carbonTablePath.getPath)(sparkSession)
 
     // update the schema modified time
-    metastore.updateAndTouchSchemasUpdatedTime(
-      carbonLoadModel.getDatabaseName,
-      carbonLoadModel.getTableName)
+    metastore.updateAndTouchSchemasUpdatedTime(model.hdfsLocation)
 
     // update CarbonDataLoadSchema
     val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName),
@@ -526,7 +527,7 @@ case class LoadTable(
       val carbonLoadModel = new CarbonLoadModel()
       carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
       carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-      carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+      carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
 
       val table = relation.tableMeta.carbonTable
       carbonLoadModel.setTableName(table.getFactTableName)
@@ -882,7 +883,10 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
     val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    catalog.checkSchemasModifiedTimeAndReloadTables()
+    val tableIdentifier =
+      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath,
+        dbName.toLowerCase, tableName.toLowerCase)
+    catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath)
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
       locksToBeAcquired foreach {
@@ -891,7 +895,7 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
 
       CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .dropTable(catalog.storePath, identifier)(sparkSession)
+          .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
     } catch {
       case ex: Exception =>
@@ -910,10 +914,11 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // delete the table folder
     val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
     val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val metadataFilePath = CarbonStorePath
-        .getCarbonTablePath(catalog.storePath, carbonTableIdentifier).getMetadataDirectoryPath
+    val tableIdentifier =
+      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName)
+    val metadataFilePath =
+      CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath
     val fileType = FileFactory.getFileType(metadataFilePath)
     if (FileFactory.isFileExist(metadataFilePath, fileType)) {
       val file = FileFactory.getCarbonFile(metadataFilePath, fileType)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 549841b..2407054 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -22,22 +22,23 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
 import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema
+import org.apache.carbondata.core.metadata.schema.table
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -62,7 +63,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   }
 }
 
-class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore {
+class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
 
   @transient
   val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -77,7 +78,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     System.nanoTime() + ""
   }
 
-  lazy val metadata = loadMetadata(storePath, nextQueryId)
+  val metadata = MetaData(new ArrayBuffer[TableMeta]())
 
 
   /**
@@ -90,9 +91,22 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
   override def createCarbonRelation(parameters: Map[String, String],
       absIdentifier: AbsoluteTableIdentifier,
       sparkSession: SparkSession): CarbonRelation = {
-    lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName,
-      Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession)
-      .asInstanceOf[CarbonRelation]
+    val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName
+    val tableName = absIdentifier.getCarbonTableIdentifier.getTableName
+    val tables = getTableFromMetadataCache(database, tableName)
+    tables match {
+      case Some(t) =>
+        CarbonRelation(database, tableName,
+          CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
+      case None =>
+        readCarbonSchema(absIdentifier) match {
+          case Some(meta) =>
+            CarbonRelation(database, tableName,
+              CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
+          case None =>
+            throw new NoSuchTableException(database, tableName)
+        }
+    }
   }
 
   def lookupRelation(dbName: Option[String], tableName: String)
@@ -100,20 +114,21 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
   }
 
-  def lookupRelation(tableIdentifier: TableIdentifier)
+  override def lookupRelation(tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): LogicalPlan = {
-    checkSchemasModifiedTimeAndReloadTables()
     val database = tableIdentifier.database.getOrElse(
-      sparkSession.catalog.currentDatabase
-    )
-    val tables = getTableFromMetadata(database, tableIdentifier.table, true)
-    tables match {
-      case Some(t) =>
-        CarbonRelation(database, tableIdentifier.table,
-          CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head)
-      case None =>
-        throw new NoSuchTableException(database, tableIdentifier.table)
+      sparkSession.catalog.currentDatabase)
+    val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
+      case SubqueryAlias(_,
+      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
+      _) =>
+        carbonDatasourceHadoopRelation.carbonRelation
+      case LogicalRelation(
+      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+        carbonDatasourceHadoopRelation.carbonRelation
+      case _ => throw new NoSuchTableException(database, tableIdentifier.table)
     }
+    relation
   }
 
   /**
@@ -123,8 +138,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * @param tableName
    * @return
    */
-  def getTableFromMetadata(database: String,
-      tableName: String, readStore: Boolean = false): Option[TableMeta] = {
+  def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = {
     metadata.tablesMeta
       .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
                  c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
@@ -136,99 +150,48 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    tableExists(TableIdentifier(table, databaseOp))(sparkSession)
   }
 
-  def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
-    checkSchemasModifiedTimeAndReloadTables()
-    val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val tables = metadata.tablesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
-    tables.nonEmpty
-  }
-
-  def loadMetadata(metadataPath: String, queryId: String): MetaData = {
-    val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
-    val statistic = new QueryStatistic()
-    // creating zookeeper instance once.
-    // if zookeeper is configured as carbon lock type.
-    val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
-    if (null != zookeeperurl) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
-    }
-    if (metadataPath == null) {
-      return null
-    }
-    // if no locktype is configured and store type is HDFS set HDFS lock as default
-    if (null == CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
-        FileType.HDFS == FileFactory.getFileType(metadataPath)) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.LOCK_TYPE,
-          CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
-        )
-      LOGGER.info("Default lock type HDFSLOCK is configured")
+  override def tableExists(tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession): Boolean = {
+    try {
+      lookupRelation(tableIdentifier)(sparkSession)
+    } catch {
+      case e: Exception =>
+        return false
     }
-    val fileType = FileFactory.getFileType(metadataPath)
-    val metaDataBuffer = new ArrayBuffer[TableMeta]
-    fillMetaData(metadataPath, fileType, metaDataBuffer)
-    updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
-    statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
-      System.currentTimeMillis())
-    recorder.recordStatisticsForDriver(statistic, queryId)
-    MetaData(metaDataBuffer)
+    true
   }
 
-  private def fillMetaData(basePath: String, fileType: FileType,
-      metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
-    val databasePath = basePath // + "/schemas"
-    try {
-      if (FileFactory.isFileExist(databasePath, fileType)) {
-        val file = FileFactory.getCarbonFile(databasePath, fileType)
-        val databaseFolders = file.listFiles()
-
-        databaseFolders.foreach(databaseFolder => {
-          if (databaseFolder.isDirectory) {
-            val dbName = databaseFolder.getName
-            val tableFolders = databaseFolder.listFiles()
-
-            tableFolders.foreach(tableFolder => {
-              if (tableFolder.isDirectory) {
-                val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
-                  tableFolder.getName, UUID.randomUUID().toString)
-                val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
-                  carbonTableIdentifier)
-                val tableMetadataFile = carbonTablePath.getSchemaFilePath
-
-                if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-                  val tableName = tableFolder.getName
-                  val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
-                  val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
-                  val schemaConverter = new ThriftWrapperSchemaConverterImpl
-                  val wrapperTableInfo = schemaConverter
-                    .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
-                  val schemaFilePath = CarbonStorePath
-                    .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
-                  wrapperTableInfo.setStorePath(storePath)
-                  wrapperTableInfo
-                    .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-                  CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-                  val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-                  metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
-                    carbonTable)
-                }
-              }
-            })
-          }
-        })
-      } else {
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
-      }
-    } catch {
-      case s: java.io.FileNotFoundException =>
-        s.printStackTrace()
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
+  private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
+    val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
+    val tableName = identifier.getCarbonTableIdentifier.getTableName
+    val storePath = identifier.getStorePath
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
+      tableName.toLowerCase(), UUID.randomUUID().toString)
+    val carbonTablePath =
+      CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+    val tableMetadataFile = carbonTablePath.getSchemaFilePath
+    val fileType = FileFactory.getFileType(tableMetadataFile)
+    if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+      val tableUniqueName = dbName + "_" + tableName
+      val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+      val schemaConverter = new ThriftWrapperSchemaConverterImpl
+      val wrapperTableInfo = schemaConverter
+        .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+      val schemaFilePath = CarbonStorePath
+        .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+      wrapperTableInfo.setStorePath(storePath)
+      wrapperTableInfo
+        .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+      val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+      val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
+        identifier.getStorePath,
+        identifier.getTablePath,
+        carbonTable)
+      metadata.tablesMeta += tableMeta
+      Some(tableMeta)
+    } else {
+      None
     }
   }
 
@@ -238,15 +201,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * @param newTableIdentifier
    * @param thriftTableInfo
    * @param schemaEvolutionEntry
-   * @param carbonStorePath
+   * @param tablePath
    * @param sparkSession
    */
   def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
-      carbonStorePath: String)
-    (sparkSession: SparkSession): String = {
+      tablePath: String) (sparkSession: SparkSession): String = {
+    val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
@@ -255,11 +218,19 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
       .fromExternalToWrapperTableInfo(thriftTableInfo,
           newTableIdentifier.getDatabaseName,
           newTableIdentifier.getTableName,
-          carbonStorePath)
-    createSchemaThriftFile(wrapperTableInfo,
+        absoluteTableIdentifier.getStorePath)
+    val identifier =
+      new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
+        newTableIdentifier.getTableName,
+        wrapperTableInfo.getFactTable.getTableId)
+    val path = createSchemaThriftFile(wrapperTableInfo,
       thriftTableInfo,
+      identifier)
+    addTableCache(wrapperTableInfo,
+      AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath,
       newTableIdentifier.getDatabaseName,
-      newTableIdentifier.getTableName)(sparkSession)
+      newTableIdentifier.getTableName))
+    path
   }
 
   /**
@@ -267,25 +238,27 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    *
    * @param carbonTableIdentifier
    * @param thriftTableInfo
-   * @param carbonStorePath
+   * @param tablePath
    * @param sparkSession
    */
   def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      carbonStorePath: String)
-    (sparkSession: SparkSession): String = {
+      tablePath: String)(sparkSession: SparkSession): String = {
+    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val wrapperTableInfo = schemaConverter
       .fromExternalToWrapperTableInfo(thriftTableInfo,
         carbonTableIdentifier.getDatabaseName,
         carbonTableIdentifier.getTableName,
-        carbonStorePath)
+        tableIdentifier.getStorePath)
     val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
-    createSchemaThriftFile(wrapperTableInfo,
+    wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+    val path = createSchemaThriftFile(wrapperTableInfo,
       thriftTableInfo,
-      carbonTableIdentifier.getDatabaseName,
-      carbonTableIdentifier.getTableName)(sparkSession)
+      tableIdentifier.getCarbonTableIdentifier)
+    addTableCache(wrapperTableInfo, tableIdentifier)
+    path
   }
 
 
@@ -296,24 +269,38 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * Load CarbonTable from wrapper tableInfo
    *
    */
-  def createTableFromThrift(
-      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
-      dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = {
-    if (tableExists(tableName, Some(dbName))(sparkSession)) {
-      sys.error(s"Table [$tableName] already exists under Database [$dbName]")
-    }
-    val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
+  def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val dbName = tableInfo.getDatabaseName
+    val tableName = tableInfo.getFactTable.getTableName
     val thriftTableInfo = schemaConverter
       .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
-    thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
-      .add(schemaEvolutionEntry)
-    val carbonTablePath = createSchemaThriftFile(tableInfo,
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    tableInfo.setStorePath(identifier.getStorePath)
+    createSchemaThriftFile(tableInfo,
       thriftTableInfo,
-      dbName,
-      tableName)(sparkSession)
+      identifier.getCarbonTableIdentifier)
     LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
-    (carbonTablePath, "")
+  }
+
+  /**
+   * Generates schema string from TableInfo
+   */
+  override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
+      tablePath: String): String = {
+    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+    val schemaMetadataPath =
+      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+    tableInfo.setMetaDataFilepath(schemaMetadataPath)
+    tableInfo.setStorePath(tableIdentifier.getStorePath)
+    val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+    tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+    removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName)
+    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+    addTableCache(tableInfo, tableIdentifier)
+    CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
   }
 
   /**
@@ -321,23 +308,16 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    *
    * @param tableInfo
    * @param thriftTableInfo
-   * @param dbName
-   * @param tableName
-   * @param sparkSession
    * @return
    */
-  private def createSchemaThriftFile(
-      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
-      thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      dbName: String, tableName: String)
-    (sparkSession: SparkSession): String = {
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
-      tableInfo.getFactTable.getTableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+  private def createSchemaThriftFile(tableInfo: schema.table.TableInfo,
+      thriftTableInfo: TableInfo,
+      carbonTableIdentifier: CarbonTableIdentifier): String = {
+    val carbonTablePath = CarbonStorePath.
+      getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
     tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setStorePath(storePath)
     val fileType = FileFactory.getFileType(schemaMetadataPath)
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
       FileFactory.mkdirs(schemaMetadataPath, fileType)
@@ -346,13 +326,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     thriftWriter.open(FileWriteOperation.OVERWRITE)
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
-    removeTableFromMetadata(dbName, tableName)
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath))
+    carbonTablePath.getPath
+  }
+
+  protected def addTableCache(tableInfo: table.TableInfo,
+      absoluteTableIdentifier: AbsoluteTableIdentifier) = {
+    val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
+    CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
+    removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
-      CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
+    val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath,
+      absoluteTableIdentifier.getTablePath,
+      CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
     metadata.tablesMeta += tableMeta
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-    carbonTablePath.getPath
   }
 
   /**
@@ -362,13 +349,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * @param tableName
    */
   def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
-    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
+    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
     metadataToBeRemoved match {
       case Some(tableMeta) =>
         metadata.tablesMeta -= tableMeta
         CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
       case None =>
-        LOGGER.debug(s"No entry for table $tableName in database $dbName")
+        if (LOGGER.isDebugEnabled) {
+          LOGGER.debug(s"No entry for table $tableName in database $dbName")
+        }
     }
   }
 
@@ -402,23 +391,23 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
 
 
   def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
-    val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val tableName = tableIdentifier.table.toLowerCase
-
-    val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getPath
-
-    val fileType = FileFactory.getFileType(tablePath)
-    FileFactory.isFileExist(tablePath, fileType)
+    try {
+      val tablePath = lookupRelation(tableIdentifier)(sparkSession).
+        asInstanceOf[CarbonRelation].tableMeta.tablePath
+      val fileType = FileFactory.getFileType(tablePath)
+      FileFactory.isFileExist(tablePath, fileType)
+    } catch {
+      case e: Exception =>
+        false
+    }
   }
 
-  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+  def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession) {
     val dbName = tableIdentifier.database.get
     val tableName = tableIdentifier.table
-
-    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
@@ -429,26 +418,18 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     if (FileFactory.isFileExist(metadataFilePath, fileType)) {
       // while drop we should refresh the schema modified time so that if any thing has changed
       // in the other beeline need to update.
-      checkSchemasModifiedTimeAndReloadTables
-
-      val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
-        tableIdentifier.table)
-      metadataToBeRemoved match {
-        case Some(tableMeta) =>
-          metadata.tablesMeta -= tableMeta
-          CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
-          updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-        case None =>
-          LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName")
-      }
+      checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+
+      removeTableFromMetadata(dbName, tableName)
+      updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
       CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
       // discard cached table info in cachedDataSourceTables
       sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
     }
   }
 
-  private def getTimestampFileAndType(databaseName: String, tableName: String) = {
-    val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+  private def getTimestampFileAndType(basePath: String) = {
+    val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
     val timestampFileType = FileFactory.getFileType(timestampFile)
     (timestampFile, timestampFileType)
   }
@@ -462,37 +443,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     tableModifiedTimeStore.put("default", timeStamp)
   }
 
-  def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) {
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName))
+  def updateAndTouchSchemasUpdatedTime(basePath: String) {
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath))
   }
 
-  /**
-   * This method will read the timestamp of empty schema file
-   *
-   * @param databaseName
-   * @param tableName
-   * @return
-   */
-  private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
-    } else {
-      System.currentTimeMillis()
-    }
-  }
 
   /**
    * This method will check and create an empty schema timestamp file
    *
-   * @param databaseName
-   * @param tableName
    * @return
    */
-  private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+  private def touchSchemaFileSystemTime(basePath: String): Long = {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath)
     if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
+      LOGGER.audit(s"Creating timestamp file for $basePath")
       FileFactory.createNewFile(timestampFile, timestampFileType)
     }
     val systemTime = System.currentTimeMillis()
@@ -501,8 +465,9 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     systemTime
   }
 
-  def checkSchemasModifiedTimeAndReloadTables() {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
+  def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
+    val (timestampFile, timestampFileType) =
+      getTimestampFileAndType(storePath)
     if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
       if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
         getLastModifiedTime ==
@@ -513,7 +478,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
   }
 
   private def refreshCache() {
-    metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta
+    metadata.tablesMeta.clear()
   }
 
   override def isReadFromHiveMetaStore: Boolean = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d8abbdf/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index a8f92ce..c328130 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -16,21 +16,15 @@
  */
 package org.apache.spark.sql.hive
 
-import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.format
@@ -41,12 +35,10 @@ import org.apache.carbondata.spark.util.CarbonSparkUtil
 /**
  * Metastore to store carbonschema in hive
  */
-class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
-  extends CarbonFileMetastore(conf, storePath) {
+class CarbonHiveMetaStore(conf: RuntimeConfig) extends CarbonFileMetastore(conf) {
 
   override def isReadFromHiveMetaStore: Boolean = true
 
-
   /**
    * Create spark session from paramters.
    *
@@ -61,7 +53,7 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
     if (info != null) {
       val table = CarbonTable.buildFromTableInfo(info)
       val meta = new TableMeta(table.getCarbonTableIdentifier,
-        table.getStorePath, table)
+        absIdentifier.getStorePath, absIdentifier.getTablePath, table)
       CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName,
         CarbonSparkUtil.createSparkMeta(table), meta)
     } else {
@@ -69,111 +61,30 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
     }
   }
 
-  override def lookupRelation(tableIdentifier: TableIdentifier)
-    (sparkSession: SparkSession): LogicalPlan = {
-    val database = tableIdentifier.database.getOrElse(
-      sparkSession.catalog.currentDatabase)
-    val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
-      case SubqueryAlias(_,
-      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
-      _) =>
-        carbonDatasourceHadoopRelation.carbonRelation
-      case LogicalRelation(
-      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
-        carbonDatasourceHadoopRelation.carbonRelation
-      case _ => throw new NoSuchTableException(database, tableIdentifier.table)
-    }
-    relation
-  }
-
-  /**
-   * This method will search for a table in the catalog metadata
-   *
-   * @param database
-   * @param tableName
-   * @return
-   */
-  override def getTableFromMetadata(database: String,
-      tableName: String,
-      readStore: Boolean): Option[TableMeta] = {
-    if (!readStore) {
-      None
-    } else {
-      super.getTableFromMetadata(database, tableName, readStore)
-    }
-  }
-
-  override def tableExists(tableIdentifier: TableIdentifier)
-    (sparkSession: SparkSession): Boolean = {
-    try {
-      lookupRelation(tableIdentifier)(sparkSession)
-    } catch {
-      case e: Exception =>
-        return false
-    }
-    true
-  }
-
-  override def loadMetadata(metadataPath: String,
-      queryId: String): MetaData = {
-    MetaData(new ArrayBuffer[TableMeta])
-  }
-
-
-  /**
-   *
-   * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
-   * Load CarbonTable from wrapper tableInfo
-   *
-   */
-  override def createTableFromThrift(tableInfo: TableInfo, dbName: String,
-      tableName: String)(sparkSession: SparkSession): (String, String) = {
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
-      tableInfo.getFactTable.getTableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
-    val schemaMetadataPath =
-      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
-    tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setStorePath(storePath)
-    val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
-    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
-    tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
-    removeTableFromMetadata(dbName, tableName)
-    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    (carbonTablePath.getPath, CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ","))
-  }
-
-  /**
-   * This method will remove the table meta from catalog metadata array
-   *
-   * @param dbName
-   * @param tableName
-   */
-  override def removeTableFromMetadata(dbName: String,
-      tableName: String): Unit = {
-    // do nothing
-  }
 
   override def isTablePathExists(tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): Boolean = {
     tableExists(tableIdentifier)(sparkSession)
   }
 
-  override def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+  override def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): Unit = {
     val dbName = tableIdentifier.database.get
     val tableName = tableIdentifier.table
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
     }
+    checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+    removeTableFromMetadata(dbName, tableName)
     CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
     // discard cached table info in cachedDataSourceTables
     sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
   }
 
-  override def checkSchemasModifiedTimeAndReloadTables(): Unit = {
+  override def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
     // do nothing now
   }
 
@@ -200,23 +111,23 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
    * @param newTableIdentifier
    * @param thriftTableInfo
    * @param schemaEvolutionEntry
-   * @param carbonStorePath
    * @param sparkSession
    */
   override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
-      carbonStorePath: String)
+      tablePath: String)
     (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
     }
     updateHiveMetaStore(newTableIdentifier,
       oldTableIdentifier,
       thriftTableInfo,
-      carbonStorePath,
+      identifier.getStorePath,
       sparkSession,
       schemaConverter)
   }
@@ -232,23 +143,20 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
         newTableIdentifier.getDatabaseName,
         newTableIdentifier.getTableName,
         carbonStorePath)
-    wrapperTableInfo.setStorePath(storePath)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, newTableIdentifier)
+    wrapperTableInfo.setStorePath(carbonStorePath)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier)
     val schemaMetadataPath =
       CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
     wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
     val dbName = oldTableIdentifier.getDatabaseName
     val tableName = oldTableIdentifier.getTableName
-    val carbonUpdatedIdentifier = new CarbonTableIdentifier(dbName, tableName,
-      wrapperTableInfo.getFactTable.getTableId)
     val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
     sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive(
       s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)")
     sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
-    removeTableFromMetadata(wrapperTableInfo.getDatabaseName,
-      wrapperTableInfo.getFactTable.getTableName)
+    removeTableFromMetadata(dbName, tableName)
     CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    CarbonStorePath.getCarbonTablePath(storePath, carbonUpdatedIdentifier).getPath
+    CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath
   }
 
   /**
@@ -256,21 +164,23 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
    *
    * @param carbonTableIdentifier
    * @param thriftTableInfo
-   * @param carbonStorePath
    * @param sparkSession
    */
   override def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
-      carbonStorePath: String)
+      tablePath: String)
     (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
     updateHiveMetaStore(carbonTableIdentifier,
       carbonTableIdentifier,
       thriftTableInfo,
-      carbonStorePath,
+      identifier.getStorePath,
       sparkSession,
       schemaConverter)
   }
+
+
 }