You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/11 11:56:11 UTC

[2/3] carbondata git commit: [CARBONDATA-1284]Implement hive based schema storage in carbon

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
new file mode 100644
index 0000000..048681c
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.util.UUID
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
+import org.apache.carbondata.core.fileoperations.FileWriteOperation
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.writer.ThriftWriter
+import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.processing.merger.TableMeta
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) {
+  // clear the metadata
+  def clear(): Unit = {
+    tablesMeta.clear()
+  }
+}
+
+case class CarbonMetaData(dims: Seq[String],
+    msrs: Seq[String],
+    carbonTable: CarbonTable,
+    dictionaryMap: DictionaryMap)
+
+case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
+  def get(name: String): Option[Boolean] = {
+    dictionaryMap.get(name.toLowerCase)
+  }
+}
+
+class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore {
+
+  @transient
+  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
+
+  val tableModifiedTimeStore = new java.util.HashMap[String, Long]()
+  tableModifiedTimeStore
+    .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis())
+
+  private val nextId = new AtomicLong(0)
+
+  def nextQueryId: String = {
+    System.nanoTime() + ""
+  }
+
+  lazy val metadata = loadMetadata(storePath, nextQueryId)
+
+
+  /**
+   * Create spark session from paramters.
+   *
+   * @param parameters
+   * @param absIdentifier
+   * @param sparkSession
+   */
+  override def createCarbonRelation(parameters: Map[String, String],
+      absIdentifier: AbsoluteTableIdentifier,
+      sparkSession: SparkSession): CarbonRelation = {
+    lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName,
+      Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession)
+      .asInstanceOf[CarbonRelation]
+  }
+
+  def lookupRelation(dbName: Option[String], tableName: String)
+    (sparkSession: SparkSession): LogicalPlan = {
+    lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
+  }
+
+  def lookupRelation(tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession): LogicalPlan = {
+    checkSchemasModifiedTimeAndReloadTables()
+    val database = tableIdentifier.database.getOrElse(
+      sparkSession.catalog.currentDatabase
+    )
+    val tables = getTableFromMetadata(database, tableIdentifier.table, true)
+    tables match {
+      case Some(t) =>
+        CarbonRelation(database, tableIdentifier.table,
+          CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head)
+      case None =>
+        throw new NoSuchTableException(database, tableIdentifier.table)
+    }
+  }
+
+  /**
+   * This method will search for a table in the catalog metadata
+   *
+   * @param database
+   * @param tableName
+   * @return
+   */
+  def getTableFromMetadata(database: String,
+      tableName: String, readStore: Boolean = false): Option[TableMeta] = {
+    metadata.tablesMeta
+      .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+                 c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+  }
+
+  def tableExists(
+      table: String,
+      databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean = {
+   tableExists(TableIdentifier(table, databaseOp))(sparkSession)
+  }
+
+  def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
+    checkSchemasModifiedTimeAndReloadTables()
+    val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+    val tables = metadata.tablesMeta.filter(
+      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+    tables.nonEmpty
+  }
+
+  def loadMetadata(metadataPath: String, queryId: String): MetaData = {
+    val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
+    val statistic = new QueryStatistic()
+    // creating zookeeper instance once.
+    // if zookeeper is configured as carbon lock type.
+    val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
+    if (null != zookeeperurl) {
+      CarbonProperties.getInstance
+        .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
+    }
+    if (metadataPath == null) {
+      return null
+    }
+    // if no locktype is configured and store type is HDFS set HDFS lock as default
+    if (null == CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
+        FileType.HDFS == FileFactory.getFileType(metadataPath)) {
+      CarbonProperties.getInstance
+        .addProperty(CarbonCommonConstants.LOCK_TYPE,
+          CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
+        )
+      LOGGER.info("Default lock type HDFSLOCK is configured")
+    }
+    val fileType = FileFactory.getFileType(metadataPath)
+    val metaDataBuffer = new ArrayBuffer[TableMeta]
+    fillMetaData(metadataPath, fileType, metaDataBuffer)
+    updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
+    statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
+      System.currentTimeMillis())
+    recorder.recordStatisticsForDriver(statistic, queryId)
+    MetaData(metaDataBuffer)
+  }
+
+  private def fillMetaData(basePath: String, fileType: FileType,
+      metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
+    val databasePath = basePath // + "/schemas"
+    try {
+      if (FileFactory.isFileExist(databasePath, fileType)) {
+        val file = FileFactory.getCarbonFile(databasePath, fileType)
+        val databaseFolders = file.listFiles()
+
+        databaseFolders.foreach(databaseFolder => {
+          if (databaseFolder.isDirectory) {
+            val dbName = databaseFolder.getName
+            val tableFolders = databaseFolder.listFiles()
+
+            tableFolders.foreach(tableFolder => {
+              if (tableFolder.isDirectory) {
+                val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
+                  tableFolder.getName, UUID.randomUUID().toString)
+                val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
+                  carbonTableIdentifier)
+                val tableMetadataFile = carbonTablePath.getSchemaFilePath
+
+                if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+                  val tableName = tableFolder.getName
+                  val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
+                  val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+                  val schemaConverter = new ThriftWrapperSchemaConverterImpl
+                  val wrapperTableInfo = schemaConverter
+                    .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
+                  val schemaFilePath = CarbonStorePath
+                    .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+                  wrapperTableInfo.setStorePath(storePath)
+                  wrapperTableInfo
+                    .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+                  CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+                  val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+                  metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
+                    carbonTable)
+                }
+              }
+            })
+          }
+        })
+      } else {
+        // Create folders and files.
+        FileFactory.mkdirs(databasePath, fileType)
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        s.printStackTrace()
+        // Create folders and files.
+        FileFactory.mkdirs(databasePath, fileType)
+    }
+  }
+
+  /**
+   * This method will overwrite the existing schema and update it with the given details
+   *
+   * @param newTableIdentifier
+   * @param thriftTableInfo
+   * @param schemaEvolutionEntry
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
+      oldTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      schemaEvolutionEntry: SchemaEvolutionEntry,
+      carbonStorePath: String)
+    (sparkSession: SparkSession): String = {
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    if (schemaEvolutionEntry != null) {
+      thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
+    }
+    val wrapperTableInfo = schemaConverter
+      .fromExternalToWrapperTableInfo(thriftTableInfo,
+          newTableIdentifier.getDatabaseName,
+          newTableIdentifier.getTableName,
+          carbonStorePath)
+    createSchemaThriftFile(wrapperTableInfo,
+      thriftTableInfo,
+      newTableIdentifier.getDatabaseName,
+      newTableIdentifier.getTableName)(sparkSession)
+  }
+
+  /**
+   * This method will is used to remove the evolution entry in case of failure.
+   *
+   * @param carbonTableIdentifier
+   * @param thriftTableInfo
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      carbonStorePath: String)
+    (sparkSession: SparkSession): String = {
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val wrapperTableInfo = schemaConverter
+      .fromExternalToWrapperTableInfo(thriftTableInfo,
+        carbonTableIdentifier.getDatabaseName,
+        carbonTableIdentifier.getTableName,
+        carbonStorePath)
+    val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
+    evolutionEntries.remove(evolutionEntries.size() - 1)
+    createSchemaThriftFile(wrapperTableInfo,
+      thriftTableInfo,
+      carbonTableIdentifier.getDatabaseName,
+      carbonTableIdentifier.getTableName)(sparkSession)
+  }
+
+
+
+  /**
+   *
+   * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
+   * Load CarbonTable from wrapper tableInfo
+   *
+   */
+  def createTableFromThrift(
+      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
+      dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = {
+    if (tableExists(tableName, Some(dbName))(sparkSession)) {
+      sys.error(s"Table [$tableName] already exists under Database [$dbName]")
+    }
+    val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val thriftTableInfo = schemaConverter
+      .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
+    thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
+      .add(schemaEvolutionEntry)
+    val carbonTablePath = createSchemaThriftFile(tableInfo,
+      thriftTableInfo,
+      dbName,
+      tableName)(sparkSession)
+    LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
+    (carbonTablePath, "")
+  }
+
+  /**
+   * This method will write the schema thrift file in carbon store and load table metadata
+   *
+   * @param tableInfo
+   * @param thriftTableInfo
+   * @param dbName
+   * @param tableName
+   * @param sparkSession
+   * @return
+   */
+  private def createSchemaThriftFile(
+      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      dbName: String, tableName: String)
+    (sparkSession: SparkSession): String = {
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
+      tableInfo.getFactTable.getTableId)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+    val schemaFilePath = carbonTablePath.getSchemaFilePath
+    val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
+    tableInfo.setMetaDataFilepath(schemaMetadataPath)
+    tableInfo.setStorePath(storePath)
+    val fileType = FileFactory.getFileType(schemaMetadataPath)
+    if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
+      FileFactory.mkdirs(schemaMetadataPath, fileType)
+    }
+    val thriftWriter = new ThriftWriter(schemaFilePath, false)
+    thriftWriter.open(FileWriteOperation.OVERWRITE)
+    thriftWriter.write(thriftTableInfo)
+    thriftWriter.close()
+    removeTableFromMetadata(dbName, tableName)
+    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
+      CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
+    metadata.tablesMeta += tableMeta
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+    carbonTablePath.getPath
+  }
+
+  /**
+   * This method will remove the table meta from catalog metadata array
+   *
+   * @param dbName
+   * @param tableName
+   */
+  def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
+    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
+    metadataToBeRemoved match {
+      case Some(tableMeta) =>
+        metadata.tablesMeta -= tableMeta
+        CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
+      case None =>
+        LOGGER.debug(s"No entry for table $tableName in database $dbName")
+    }
+  }
+
+  private def updateMetadataByWrapperTable(
+      wrapperTableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo): Unit = {
+
+    CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      wrapperTableInfo.getTableUniqueName)
+    for (i <- metadata.tablesMeta.indices) {
+      if (wrapperTableInfo.getTableUniqueName.equals(
+        metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
+        metadata.tablesMeta(i).carbonTable = carbonTable
+      }
+    }
+  }
+
+  def updateMetadataByThriftTable(schemaFilePath: String,
+      tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = {
+
+    tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+      .setTime_stamp(System.currentTimeMillis())
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val wrapperTableInfo = schemaConverter
+      .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+    wrapperTableInfo
+      .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+    wrapperTableInfo.setStorePath(storePath)
+    updateMetadataByWrapperTable(wrapperTableInfo)
+  }
+
+
+  def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
+    val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+    val tableName = tableIdentifier.table.toLowerCase
+
+    val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
+      new CarbonTableIdentifier(dbName, tableName, "")).getPath
+
+    val fileType = FileFactory.getFileType(tablePath)
+    FileFactory.isFileExist(tablePath, fileType)
+  }
+
+  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession) {
+    val dbName = tableIdentifier.database.get
+    val tableName = tableIdentifier.table
+
+    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
+      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    if (null != carbonTable) {
+      // clear driver B-tree and dictionary cache
+      ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
+    }
+    val fileType = FileFactory.getFileType(metadataFilePath)
+
+    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+      // while drop we should refresh the schema modified time so that if any thing has changed
+      // in the other beeline need to update.
+      checkSchemasModifiedTimeAndReloadTables
+      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+      val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
+        tableIdentifier.table)
+      metadataToBeRemoved match {
+        case Some(tableMeta) =>
+          metadata.tablesMeta -= tableMeta
+          CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
+          updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+        case None =>
+          LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName")
+      }
+      CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
+      // discard cached table info in cachedDataSourceTables
+      sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
+    }
+  }
+
+  private def getTimestampFileAndType(databaseName: String, tableName: String) = {
+    val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+    val timestampFileType = FileFactory.getFileType(timestampFile)
+    (timestampFile, timestampFileType)
+  }
+
+  /**
+   * This method will put the updated timestamp of schema file in the table modified time store map
+   *
+   * @param timeStamp
+   */
+  private def updateSchemasUpdatedTime(timeStamp: Long) {
+    tableModifiedTimeStore.put("default", timeStamp)
+  }
+
+  def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) {
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName))
+  }
+
+  /**
+   * This method will read the timestamp of empty schema file
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+      FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
+    } else {
+      System.currentTimeMillis()
+    }
+  }
+
+  /**
+   * This method will check and create an empty schema timestamp file
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+    if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
+      LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
+      FileFactory.createNewFile(timestampFile, timestampFileType)
+    }
+    val systemTime = System.currentTimeMillis()
+    FileFactory.getCarbonFile(timestampFile, timestampFileType)
+      .setLastModifiedTime(systemTime)
+    systemTime
+  }
+
+  def checkSchemasModifiedTimeAndReloadTables() {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
+    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+      if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
+        getLastModifiedTime ==
+            tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
+        refreshCache()
+      }
+    }
+  }
+
+  private def refreshCache() {
+    metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta
+  }
+
+  override def isReadFromHiveMetaStore: Boolean = false
+
+  override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
+    metadata.tablesMeta.map(_.carbonTable)
+
+  override def getThriftTableInfo(tablePath: CarbonTablePath)
+    (sparkSession: SparkSession): TableInfo = {
+    val tableMetadataFile = tablePath.getSchemaFilePath
+    CarbonUtil.readSchemaFile(tableMetadataFile)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
new file mode 100644
index 0000000..03d0bde
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.format
+import org.apache.carbondata.format.SchemaEvolutionEntry
+import org.apache.carbondata.processing.merger.TableMeta
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+/**
+ * Metastore to store carbonschema in hive
+ */
+class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
+  extends CarbonFileMetastore(conf, storePath) {
+
+  override def isReadFromHiveMetaStore: Boolean = true
+
+
+  /**
+   * Create spark session from paramters.
+   *
+   * @param parameters
+   * @param absIdentifier
+   * @param sparkSession
+   */
+  override def createCarbonRelation(parameters: Map[String, String],
+      absIdentifier: AbsoluteTableIdentifier,
+      sparkSession: SparkSession): CarbonRelation = {
+    val info = CarbonUtil.convertGsonToTableInfo(parameters.asJava)
+    if (info != null) {
+      val table = CarbonTable.buildFromTableInfo(info)
+      val meta = new TableMeta(table.getCarbonTableIdentifier,
+        table.getStorePath, table)
+      CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName,
+        CarbonSparkUtil.createSparkMeta(table), meta)
+    } else {
+      super.createCarbonRelation(parameters, absIdentifier, sparkSession)
+    }
+  }
+
+  override def lookupRelation(tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession): LogicalPlan = {
+    val database = tableIdentifier.database.getOrElse(
+      sparkSession.catalog.currentDatabase)
+    val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
+      case SubqueryAlias(_,
+      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
+      _) =>
+        carbonDatasourceHadoopRelation.carbonRelation
+      case LogicalRelation(
+      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+        carbonDatasourceHadoopRelation.carbonRelation
+      case _ => throw new NoSuchTableException(database, tableIdentifier.table)
+    }
+    relation
+  }
+
+  /**
+   * This method will search for a table in the catalog metadata
+   *
+   * @param database
+   * @param tableName
+   * @return
+   */
+  override def getTableFromMetadata(database: String,
+      tableName: String,
+      readStore: Boolean): Option[TableMeta] = {
+    if (!readStore) {
+      None
+    } else {
+      super.getTableFromMetadata(database, tableName, readStore)
+    }
+  }
+
+  override def tableExists(tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession): Boolean = {
+    try {
+      lookupRelation(tableIdentifier)(sparkSession)
+    } catch {
+      case e: Exception =>
+        return false
+    }
+    true
+  }
+
+  override def loadMetadata(metadataPath: String,
+      queryId: String): MetaData = {
+    MetaData(new ArrayBuffer[TableMeta])
+  }
+
+
+  /**
+   *
+   * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
+   * Load CarbonTable from wrapper tableInfo
+   *
+   */
+  override def createTableFromThrift(tableInfo: TableInfo, dbName: String,
+      tableName: String)(sparkSession: SparkSession): (String, String) = {
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
+      tableInfo.getFactTable.getTableId)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+    val schemaMetadataPath =
+      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+    tableInfo.setMetaDataFilepath(schemaMetadataPath)
+    tableInfo.setStorePath(storePath)
+    val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+    tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+    removeTableFromMetadata(dbName, tableName)
+    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+    (carbonTablePath.getPath, CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ","))
+  }
+
+  /**
+   * This method will remove the table meta from catalog metadata array
+   *
+   * @param dbName
+   * @param tableName
+   */
+  override def removeTableFromMetadata(dbName: String,
+      tableName: String): Unit = {
+    // do nothing
+  }
+
+  override def isTablePathExists(tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession): Boolean = {
+    tableExists(tableIdentifier)(sparkSession)
+  }
+
+  override def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession): Unit = {
+    val dbName = tableIdentifier.database.get
+    val tableName = tableIdentifier.table
+
+    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
+      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    if (null != carbonTable) {
+      // clear driver B-tree and dictionary cache
+      ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
+    }
+    val fileType = FileFactory.getFileType(metadataFilePath)
+
+    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+      // while drop we should refresh the schema modified time so that if any thing has changed
+      // in the other beeline need to update.
+      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+    }
+    CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
+    // discard cached table info in cachedDataSourceTables
+    sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
+  }
+
+  override def checkSchemasModifiedTimeAndReloadTables(): Unit = {
+    // do nothing now
+  }
+
+  override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = {
+    // Todo
+    Seq()
+  }
+
+  override def getThriftTableInfo(tablePath: CarbonTablePath)
+    (sparkSession: SparkSession): format.TableInfo = {
+    val identifier = tablePath.getCarbonTableIdentifier
+    val relation = lookupRelation(TableIdentifier(identifier.getTableName,
+      Some(identifier.getDatabaseName)))(sparkSession).asInstanceOf[CarbonRelation]
+    val carbonTable = relation.metaData.carbonTable
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo,
+      carbonTable.getDatabaseName,
+      carbonTable.getFactTableName)
+  }
+
+  /**
+   * This method will overwrite the existing schema and update it with the given details
+   *
+   * @param newTableIdentifier
+   * @param thriftTableInfo
+   * @param schemaEvolutionEntry
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
+      oldTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: format.TableInfo,
+      schemaEvolutionEntry: SchemaEvolutionEntry,
+      carbonStorePath: String)
+    (sparkSession: SparkSession): String = {
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    if (schemaEvolutionEntry != null) {
+      thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
+    }
+    updateHiveMetaStore(newTableIdentifier,
+      oldTableIdentifier,
+      thriftTableInfo,
+      carbonStorePath,
+      sparkSession,
+      schemaConverter)
+  }
+
+  private def updateHiveMetaStore(newTableIdentifier: CarbonTableIdentifier,
+      oldTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: format.TableInfo,
+      carbonStorePath: String,
+      sparkSession: SparkSession,
+      schemaConverter: ThriftWrapperSchemaConverterImpl) = {
+    val wrapperTableInfo = schemaConverter
+      .fromExternalToWrapperTableInfo(thriftTableInfo,
+        newTableIdentifier.getDatabaseName,
+        newTableIdentifier.getTableName,
+        carbonStorePath)
+    wrapperTableInfo.setStorePath(storePath)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, newTableIdentifier)
+    val schemaMetadataPath =
+      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+    wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
+    val dbName = oldTableIdentifier.getDatabaseName
+    val tableName = oldTableIdentifier.getTableName
+    val carbonUpdatedIdentifier = new CarbonTableIdentifier(dbName, tableName,
+      wrapperTableInfo.getFactTable.getTableId)
+    val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
+    sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive(
+      s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)")
+    sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
+    removeTableFromMetadata(wrapperTableInfo.getDatabaseName,
+      wrapperTableInfo.getFactTable.getTableName)
+    CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+    CarbonStorePath.getCarbonTablePath(storePath, carbonUpdatedIdentifier).getPath
+  }
+
+  /**
+   * This method will is used to remove the evolution entry in case of failure.
+   *
+   * @param carbonTableIdentifier
+   * @param thriftTableInfo
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  override def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: format.TableInfo,
+      carbonStorePath: String)
+    (sparkSession: SparkSession): String = {
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
+    evolutionEntries.remove(evolutionEntries.size() - 1)
+    updateHiveMetaStore(carbonTableIdentifier,
+      carbonTableIdentifier,
+      thriftTableInfo,
+      carbonStorePath,
+      sparkSession,
+      schemaConverter)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
new file mode 100644
index 0000000..ab27b4f
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.schema.table
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.processing.merger.TableMeta
+
+/**
+ * Interface for Carbonmetastore
+ */
+trait CarbonMetaStore {
+
+  def lookupRelation(dbName: Option[String], tableName: String)
+    (sparkSession: SparkSession): LogicalPlan
+
+  def lookupRelation(tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession): LogicalPlan
+
+  /**
+   * Create spark session from paramters.
+   * @param parameters
+   * @param absIdentifier
+   * @param sparkSession
+   */
+  def createCarbonRelation(parameters: Map[String, String],
+      absIdentifier: AbsoluteTableIdentifier,
+      sparkSession: SparkSession): CarbonRelation
+
+  /**
+   * Get table meta
+   * TODO remove it if possible
+   * @param database
+   * @param tableName
+   * @param readStore
+   * @return
+   */
+  def getTableFromMetadata(database: String,
+      tableName: String,
+      readStore: Boolean = false): Option[TableMeta]
+
+  def tableExists(
+    table: String,
+    databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean
+
+  def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean
+
+  def loadMetadata(metadataPath: String, queryId: String): MetaData
+
+  /**
+   * This method will overwrite the existing schema and update it with the given details
+   *
+   * @param newTableIdentifier
+   * @param thriftTableInfo
+   * @param schemaEvolutionEntry
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
+      oldTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      schemaEvolutionEntry: SchemaEvolutionEntry,
+      carbonStorePath: String)(sparkSession: SparkSession): String
+
+  /**
+   * This method will is used to remove the evolution entry in case of failure.
+   *
+   * @param carbonTableIdentifier
+   * @param thriftTableInfo
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      carbonStorePath: String)
+    (sparkSession: SparkSession): String
+
+  /**
+   *
+   * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
+   * Load CarbonTable from wrapper tableInfo
+   *
+   */
+  def createTableFromThrift(tableInfo: table.TableInfo,
+      dbName: String,
+      tableName: String)(sparkSession: SparkSession): (String, String)
+
+  /**
+   * This method will remove the table meta from catalog metadata array
+   *
+   * @param dbName
+   * @param tableName
+   */
+  def removeTableFromMetadata(dbName: String, tableName: String): Unit
+
+  def updateMetadataByThriftTable(schemaFilePath: String,
+      tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit
+
+  def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean
+
+  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession)
+
+  def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String)
+
+  def checkSchemasModifiedTimeAndReloadTables()
+
+  def isReadFromHiveMetaStore : Boolean
+
+  def listAllTables(sparkSession: SparkSession): Seq[CarbonTable]
+
+  def storePath: String
+
+  def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo
+
+}
+
+/**
+ * Factory for Carbon metastore
+ */
+object CarbonMetaStoreFactory {
+
+  def createCarbonMetaStore(conf: RuntimeConfig, storePath: String): CarbonMetaStore = {
+    val readSchemaFromHiveMetaStore = readSchemaFromHive(conf)
+    if (readSchemaFromHiveMetaStore) {
+      new CarbonHiveMetaStore(conf, storePath)
+    } else {
+      new CarbonFileMetastore(conf, storePath)
+    }
+  }
+
+  def readSchemaFromHive(conf: RuntimeConfig): Boolean = {
+    val readSchemaFromHive = {
+      if (conf.contains(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE)) {
+        conf.get(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE)
+      } else if (System.getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE) != null) {
+        System.getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE)
+      } else {
+        CarbonProperties.getInstance().
+          getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+      }
+    }
+    readSchemaFromHive.toBoolean
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
deleted file mode 100644
index 04a94ce..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ /dev/null
@@ -1,960 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io._
-import java.util.{GregorianCalendar, LinkedHashSet, UUID}
-import java.util.concurrent.atomic.AtomicLong
-
-import scala.Array.canBuildFrom
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.util.parsing.combinator.RegexParsers
-
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException}
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
-import org.apache.spark.sql.execution.command.Partitioner
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
-import org.apache.carbondata.core.fileoperations.FileWriteOperation
-import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
-import org.apache.carbondata.core.reader.ThriftReader
-import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.core.writer.ThriftWriter
-import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.processing.merger.TableMeta
-import org.apache.carbondata.spark.util.CarbonSparkUtil
-
-case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) {
-  // clear the metadata
-  def clear(): Unit = {
-    tablesMeta.clear()
-  }
-}
-
-case class CarbonMetaData(dims: Seq[String],
-    msrs: Seq[String],
-    carbonTable: CarbonTable,
-    dictionaryMap: DictionaryMap)
-
-object CarbonMetastore {
-
-  def readSchemaFileToThriftTable(schemaFilePath: String): TableInfo = {
-    val createTBase = new ThriftReader.TBaseCreator() {
-      override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
-        new TableInfo()
-      }
-    }
-    val thriftReader = new ThriftReader(schemaFilePath, createTBase)
-    var tableInfo: TableInfo = null
-    try {
-      thriftReader.open()
-      tableInfo = thriftReader.read().asInstanceOf[TableInfo]
-    } finally {
-      thriftReader.close()
-    }
-    tableInfo
-  }
-
-  def writeThriftTableToSchemaFile(schemaFilePath: String, tableInfo: TableInfo): Unit = {
-    val thriftWriter = new ThriftWriter(schemaFilePath, false)
-    try {
-      thriftWriter.open()
-      thriftWriter.write(tableInfo);
-    } finally {
-      thriftWriter.close()
-    }
-  }
-
-}
-
-case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
-  def get(name: String): Option[Boolean] = {
-    dictionaryMap.get(name.toLowerCase)
-  }
-}
-
-class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
-
-  @transient
-  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
-
-  val tableModifiedTimeStore = new java.util.HashMap[String, Long]()
-  tableModifiedTimeStore
-    .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis())
-
-  private val nextId = new AtomicLong(0)
-
-  def nextQueryId: String = {
-    System.nanoTime() + ""
-  }
-
-  val metadata = loadMetadata(storePath, nextQueryId)
-
-  def getTableCreationTime(databaseName: String, tableName: String): Long = {
-    val tableMeta = metadata.tablesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(databaseName) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
-    val tableCreationTime = tableMeta.head.carbonTable.getTableLastUpdatedTime
-    tableCreationTime
-  }
-
-  def cleanStore(): Unit = {
-    try {
-      val fileType = FileFactory.getFileType(storePath)
-      FileFactory.deleteFile(storePath, fileType)
-      metadata.clear()
-    } catch {
-      case e: Throwable => LOGGER.error(e, "clean store failed")
-    }
-  }
-
-  def lookupRelation(dbName: Option[String], tableName: String)
-    (sparkSession: SparkSession): LogicalPlan = {
-    lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
-  }
-
-  def lookupRelation(tableIdentifier: TableIdentifier, alias: Option[String] = None)
-    (sparkSession: SparkSession): LogicalPlan = {
-    checkSchemasModifiedTimeAndReloadTables()
-    val database = tableIdentifier.database.getOrElse(
-      sparkSession.catalog.currentDatabase
-    )
-    val tables = getTableFromMetadata(database, tableIdentifier.table)
-    tables match {
-      case Some(t) =>
-        CarbonRelation(database, tableIdentifier.table,
-          CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias)
-      case None =>
-        throw new NoSuchTableException(database, tableIdentifier.table)
-    }
-  }
-
-  /**
-   * This method will search for a table in the catalog metadata
-   *
-   * @param database
-   * @param tableName
-   * @return
-   */
-  def getTableFromMetadata(database: String,
-      tableName: String): Option[TableMeta] = {
-    metadata.tablesMeta
-      .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-                 c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
-  }
-
-  def tableExists(
-      table: String,
-      databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean = {
-    checkSchemasModifiedTimeAndReloadTables()
-    val database = databaseOp.getOrElse(sparkSession.catalog.currentDatabase)
-    val tables = metadata.tablesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(table))
-    tables.nonEmpty
-  }
-
-  def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
-    checkSchemasModifiedTimeAndReloadTables()
-    val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val tables = metadata.tablesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
-    tables.nonEmpty
-  }
-
-  def loadMetadata(metadataPath: String, queryId: String): MetaData = {
-    val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
-    val statistic = new QueryStatistic()
-    // creating zookeeper instance once.
-    // if zookeeper is configured as carbon lock type.
-    val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
-    if (null != zookeeperurl) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
-    }
-    if (metadataPath == null) {
-      return null
-    }
-    // if no locktype is configured and store type is HDFS set HDFS lock as default
-    if (null == CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
-        FileType.HDFS == FileFactory.getFileType(metadataPath)) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.LOCK_TYPE,
-          CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
-        )
-      LOGGER.info("Default lock type HDFSLOCK is configured")
-    }
-    val fileType = FileFactory.getFileType(metadataPath)
-    val metaDataBuffer = new ArrayBuffer[TableMeta]
-    fillMetaData(metadataPath, fileType, metaDataBuffer)
-    updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
-    statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
-      System.currentTimeMillis())
-    recorder.recordStatisticsForDriver(statistic, queryId)
-    MetaData(metaDataBuffer)
-  }
-
-  private def fillMetaData(basePath: String, fileType: FileType,
-      metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
-    val databasePath = basePath // + "/schemas"
-    try {
-      if (FileFactory.isFileExist(databasePath, fileType)) {
-        val file = FileFactory.getCarbonFile(databasePath, fileType)
-        val databaseFolders = file.listFiles()
-
-        databaseFolders.foreach(databaseFolder => {
-          if (databaseFolder.isDirectory) {
-            val dbName = databaseFolder.getName
-            val tableFolders = databaseFolder.listFiles()
-
-            tableFolders.foreach(tableFolder => {
-              if (tableFolder.isDirectory) {
-                val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
-                  tableFolder.getName, UUID.randomUUID().toString)
-                val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
-                  carbonTableIdentifier)
-                val tableMetadataFile = carbonTablePath.getSchemaFilePath
-
-                if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-                  val tableName = tableFolder.getName
-                  val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
-                  val tableInfo: TableInfo = readSchemaFile(tableMetadataFile)
-                  val schemaConverter = new ThriftWrapperSchemaConverterImpl
-                  val wrapperTableInfo = schemaConverter
-                    .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
-                  val schemaFilePath = CarbonStorePath
-                    .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
-                  wrapperTableInfo.setStorePath(storePath)
-                  wrapperTableInfo
-                    .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-                  CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-                  val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-                  metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
-                    carbonTable)
-                }
-              }
-            })
-          }
-        })
-      } else {
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
-      }
-    } catch {
-      case s: java.io.FileNotFoundException =>
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
-    }
-  }
-
-  /**
-   * This method will read the schema file from a given path
-   *
-   * @param schemaFilePath
-   * @return
-   */
-  def readSchemaFile(schemaFilePath: String): TableInfo = {
-    val createTBase = new ThriftReader.TBaseCreator() {
-      override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
-        new TableInfo()
-      }
-    }
-    val thriftReader = new ThriftReader(schemaFilePath, createTBase)
-    thriftReader.open()
-    val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo]
-    thriftReader.close()
-    tableInfo
-  }
-
-  /**
-   * This method will overwrite the existing schema and update it with the given details
-   *
-   * @param carbonTableIdentifier
-   * @param thriftTableInfo
-   * @param schemaEvolutionEntry
-   * @param carbonStorePath
-   * @param sparkSession
-   */
-  def updateTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
-      thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      schemaEvolutionEntry: SchemaEvolutionEntry,
-      carbonStorePath: String)
-    (sparkSession: SparkSession): String = {
-    val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(thriftTableInfo,
-          carbonTableIdentifier.getDatabaseName,
-          carbonTableIdentifier.getTableName,
-          carbonStorePath)
-    createSchemaThriftFile(wrapperTableInfo,
-      thriftTableInfo,
-      carbonTableIdentifier.getDatabaseName,
-      carbonTableIdentifier.getTableName)(sparkSession)
-  }
-
-  /**
-   * This method will is used to remove the evolution entry in case of failure.
-   *
-   * @param carbonTableIdentifier
-   * @param thriftTableInfo
-   * @param carbonStorePath
-   * @param sparkSession
-   */
-  def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
-      thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      carbonStorePath: String)
-    (sparkSession: SparkSession): String = {
-    val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(thriftTableInfo,
-        carbonTableIdentifier.getDatabaseName,
-        carbonTableIdentifier.getTableName,
-        carbonStorePath)
-    val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
-    evolutionEntries.remove(evolutionEntries.size() - 1)
-    createSchemaThriftFile(wrapperTableInfo,
-      thriftTableInfo,
-      carbonTableIdentifier.getDatabaseName,
-      carbonTableIdentifier.getTableName)(sparkSession)
-  }
-
-
-
-  /**
-   *
-   * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
-   * Load CarbonTable from wrapper tableInfo
-   *
-   */
-  def createTableFromThrift(
-      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
-      dbName: String, tableName: String)
-    (sparkSession: SparkSession): String = {
-    if (tableExists(tableName, Some(dbName))(sparkSession)) {
-      sys.error(s"Table [$tableName] already exists under Database [$dbName]")
-    }
-    val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
-    val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val thriftTableInfo = schemaConverter
-      .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
-    thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
-      .add(schemaEvolutionEntry)
-    val carbonTablePath = createSchemaThriftFile(tableInfo,
-      thriftTableInfo,
-      dbName,
-      tableName)(sparkSession)
-    LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
-    carbonTablePath
-  }
-
-  /**
-   * This method will write the schema thrift file in carbon store and load table metadata
-   *
-   * @param tableInfo
-   * @param thriftTableInfo
-   * @param dbName
-   * @param tableName
-   * @param sparkSession
-   * @return
-   */
-  private def createSchemaThriftFile(
-      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
-      thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      dbName: String, tableName: String)
-    (sparkSession: SparkSession): String = {
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
-      tableInfo.getFactTable.getTableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
-    val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
-    tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setStorePath(storePath)
-    val fileType = FileFactory.getFileType(schemaMetadataPath)
-    if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
-      FileFactory.mkdirs(schemaMetadataPath, fileType)
-    }
-    val thriftWriter = new ThriftWriter(schemaFilePath, false)
-    thriftWriter.open(FileWriteOperation.OVERWRITE)
-    thriftWriter.write(thriftTableInfo)
-    thriftWriter.close()
-    removeTableFromMetadata(dbName, tableName)
-    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
-      CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
-    metadata.tablesMeta += tableMeta
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-    carbonTablePath.getPath
-  }
-
-  /**
-   * This method will remove the table meta from catalog metadata array
-   *
-   * @param dbName
-   * @param tableName
-   */
-  def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
-    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
-    metadataToBeRemoved match {
-      case Some(tableMeta) =>
-        metadata.tablesMeta -= tableMeta
-        CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
-      case None =>
-        LOGGER.debug(s"No entry for table $tableName in database $dbName")
-    }
-  }
-
-  private def updateMetadataByWrapperTable(
-      wrapperTableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo): Unit = {
-
-    CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-      wrapperTableInfo.getTableUniqueName)
-    for (i <- metadata.tablesMeta.indices) {
-      if (wrapperTableInfo.getTableUniqueName.equals(
-        metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
-        metadata.tablesMeta(i).carbonTable = carbonTable
-      }
-    }
-  }
-
-  def updateMetadataByThriftTable(schemaFilePath: String,
-      tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = {
-
-    tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
-      .setTime_stamp(System.currentTimeMillis())
-    val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
-    wrapperTableInfo
-      .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-    wrapperTableInfo.setStorePath(storePath)
-    updateMetadataByWrapperTable(wrapperTableInfo)
-  }
-
-  /**
-   * Shows all schemas which has Database name like
-   */
-  def showDatabases(schemaLike: Option[String]): Seq[String] = {
-    checkSchemasModifiedTimeAndReloadTables()
-    metadata.tablesMeta.map { c =>
-      schemaLike match {
-        case Some(name) =>
-          if (c.carbonTableIdentifier.getDatabaseName.contains(name)) {
-            c.carbonTableIdentifier
-              .getDatabaseName
-          } else {
-            null
-          }
-        case _ => c.carbonTableIdentifier.getDatabaseName
-      }
-    }.filter(f => f != null)
-  }
-
-  /**
-   * Shows all tables in all schemas.
-   */
-  def getAllTables(): Seq[TableIdentifier] = {
-    checkSchemasModifiedTimeAndReloadTables()
-    metadata.tablesMeta.map { c =>
-      TableIdentifier(c.carbonTableIdentifier.getTableName,
-        Some(c.carbonTableIdentifier.getDatabaseName))
-    }
-  }
-
-  def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
-    val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val tableName = tableIdentifier.table.toLowerCase
-
-    val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getPath
-
-    val fileType = FileFactory.getFileType(tablePath)
-    FileFactory.isFileExist(tablePath, fileType)
-  }
-
-  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
-    (sparkSession: SparkSession) {
-    val dbName = tableIdentifier.database.get
-    val tableName = tableIdentifier.table
-
-    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
-
-    val fileType = FileFactory.getFileType(metadataFilePath)
-
-    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
-      // while drop we should refresh the schema modified time so that if any thing has changed
-      // in the other beeline need to update.
-      checkSchemasModifiedTimeAndReloadTables
-      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
-      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
-      val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
-        tableIdentifier.table)
-      metadataToBeRemoved match {
-        case Some(tableMeta) =>
-          metadata.tablesMeta -= tableMeta
-          CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
-          updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-        case None =>
-          LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName")
-      }
-      CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
-      // discard cached table info in cachedDataSourceTables
-      sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
-    }
-  }
-
-  private def getTimestampFileAndType(databaseName: String, tableName: String) = {
-    val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
-    val timestampFileType = FileFactory.getFileType(timestampFile)
-    (timestampFile, timestampFileType)
-  }
-
-  /**
-   * This method will put the updated timestamp of schema file in the table modified time store map
-   *
-   * @param timeStamp
-   */
-  def updateSchemasUpdatedTime(timeStamp: Long) {
-    tableModifiedTimeStore.put("default", timeStamp)
-  }
-
-  /**
-   * This method will read the timestamp of empty schema file
-   *
-   * @param databaseName
-   * @param tableName
-   * @return
-   */
-  def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
-    } else {
-      System.currentTimeMillis()
-    }
-  }
-
-  /**
-   * This method will check and create an empty schema timestamp file
-   *
-   * @param databaseName
-   * @param tableName
-   * @return
-   */
-  def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
-    if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
-      FileFactory.createNewFile(timestampFile, timestampFileType)
-    }
-    val systemTime = System.currentTimeMillis()
-    FileFactory.getCarbonFile(timestampFile, timestampFileType)
-      .setLastModifiedTime(systemTime)
-    systemTime
-  }
-
-  def checkSchemasModifiedTimeAndReloadTables() {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
-        getLastModifiedTime ==
-            tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
-        refreshCache()
-      }
-    }
-  }
-
-  def refreshCache() {
-    metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta
-  }
-
-  def getSchemaLastUpdatedTime(databaseName: String, tableName: String): Long = {
-    var schemaLastUpdatedTime = System.currentTimeMillis
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      schemaLastUpdatedTime = FileFactory.getCarbonFile(timestampFile, timestampFileType)
-        .getLastModifiedTime
-    }
-    schemaLastUpdatedTime
-  }
-
-  def readTableMetaDataFile(tableFolder: CarbonFile,
-      fileType: FileFactory.FileType):
-  (String, String, String, String, Partitioner, Long) = {
-    val tableMetadataFile = tableFolder.getAbsolutePath + "/metadata"
-
-    var schema: String = ""
-    var databaseName: String = ""
-    var tableName: String = ""
-    var dataPath: String = ""
-    var partitioner: Partitioner = null
-    val cal = new GregorianCalendar(2011, 1, 1)
-    var tableCreationTime = cal.getTime.getTime
-
-    if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-      // load metadata
-      val in = FileFactory.getDataInputStream(tableMetadataFile, fileType)
-      var len = 0
-      try {
-        len = in.readInt()
-      } catch {
-        case others: EOFException => len = 0
-      }
-
-      while (len > 0) {
-        val databaseNameBytes = new Array[Byte](len)
-        in.readFully(databaseNameBytes)
-
-        databaseName = new String(databaseNameBytes, "UTF8")
-        val tableNameLen = in.readInt()
-        val tableNameBytes = new Array[Byte](tableNameLen)
-        in.readFully(tableNameBytes)
-        tableName = new String(tableNameBytes, "UTF8")
-
-        val dataPathLen = in.readInt()
-        val dataPathBytes = new Array[Byte](dataPathLen)
-        in.readFully(dataPathBytes)
-        dataPath = new String(dataPathBytes, "UTF8")
-
-        val versionLength = in.readInt()
-        val versionBytes = new Array[Byte](versionLength)
-        in.readFully(versionBytes)
-
-        val schemaLen = in.readInt()
-        val schemaBytes = new Array[Byte](schemaLen)
-        in.readFully(schemaBytes)
-        schema = new String(schemaBytes, "UTF8")
-
-        val partitionLength = in.readInt()
-        val partitionBytes = new Array[Byte](partitionLength)
-        in.readFully(partitionBytes)
-        val inStream = new ByteArrayInputStream(partitionBytes)
-        val objStream = new ObjectInputStream(inStream)
-        partitioner = objStream.readObject().asInstanceOf[Partitioner]
-        objStream.close()
-
-        try {
-          tableCreationTime = in.readLong()
-          len = in.readInt()
-        } catch {
-          case others: EOFException => len = 0
-        }
-
-      }
-      in.close()
-    }
-
-    (databaseName, tableName, dataPath, schema, partitioner, tableCreationTime)
-  }
-
-  def createDatabaseDirectory(dbName: String) {
-    val databasePath = storePath + File.separator + dbName.toLowerCase
-    val fileType = FileFactory.getFileType(databasePath)
-    FileFactory.mkdirs(databasePath, fileType)
-  }
-
-  def dropDatabaseDirectory(dbName: String) {
-    val databasePath = storePath + File.separator + dbName
-    val fileType = FileFactory.getFileType(databasePath)
-    if (FileFactory.isFileExist(databasePath, fileType)) {
-      val dbPath = FileFactory.getCarbonFile(databasePath, fileType)
-      CarbonUtil.deleteFoldersAndFiles(dbPath)
-    }
-  }
-
-}
-
-
-object CarbonMetastoreTypes extends RegexParsers {
-  protected lazy val primitiveType: Parser[DataType] =
-    "string" ^^^ StringType |
-    "float" ^^^ FloatType |
-    "int" ^^^ IntegerType |
-    "tinyint" ^^^ ShortType |
-    "short" ^^^ ShortType |
-    "double" ^^^ DoubleType |
-    "long" ^^^ LongType |
-    "binary" ^^^ BinaryType |
-    "boolean" ^^^ BooleanType |
-    fixedDecimalType |
-    "decimal" ^^^ "decimal" ^^^ DecimalType(10, 0) |
-    "varchar\\((\\d+)\\)".r ^^^ StringType |
-    "date" ^^^ DateType |
-    "timestamp" ^^^ TimestampType
-
-  protected lazy val fixedDecimalType: Parser[DataType] =
-    "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
-      case precision ~ scale =>
-        DecimalType(precision.toInt, scale.toInt)
-    }
-
-  protected lazy val arrayType: Parser[DataType] =
-    "array" ~> "<" ~> dataType <~ ">" ^^ {
-      case tpe => ArrayType(tpe)
-    }
-
-  protected lazy val mapType: Parser[DataType] =
-    "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
-      case t1 ~ _ ~ t2 => MapType(t1, t2)
-    }
-
-  protected lazy val structField: Parser[StructField] =
-    "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
-      case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
-    }
-
-  protected lazy val structType: Parser[DataType] =
-    "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
-      case fields => StructType(fields)
-    }
-
-  protected lazy val dataType: Parser[DataType] =
-    arrayType |
-    mapType |
-    structType |
-    primitiveType
-
-  def toDataType(metastoreType: String): DataType = {
-    parseAll(dataType, metastoreType) match {
-      case Success(result, _) => result
-      case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType")
-    }
-  }
-
-  def toMetastoreType(dt: DataType): String = {
-    dt match {
-      case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
-      case StructType(fields) =>
-        s"struct<${
-          fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
-            .mkString(",")
-        }>"
-      case StringType => "string"
-      case FloatType => "float"
-      case IntegerType => "int"
-      case ShortType => "tinyint"
-      case DoubleType => "double"
-      case LongType => "bigint"
-      case BinaryType => "binary"
-      case BooleanType => "boolean"
-      case DecimalType() => "decimal"
-      case TimestampType => "timestamp"
-      case DateType => "date"
-    }
-  }
-}
-
-
-/**
- * Represents logical plan for one carbon table
- */
-case class CarbonRelation(
-    databaseName: String,
-    tableName: String,
-    var metaData: CarbonMetaData,
-    tableMeta: TableMeta,
-    alias: Option[String])
-  extends LeafNode with MultiInstanceRelation {
-
-  def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
-    childDim.getDataType.toString.toLowerCase match {
-      case "array" => s"${
-        childDim.getColName.substring(dimName.length + 1)
-      }:array<${ getArrayChildren(childDim.getColName) }>"
-      case "struct" => s"${
-        childDim.getColName.substring(dimName.length + 1)
-      }:struct<${ getStructChildren(childDim.getColName) }>"
-      case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
-    }
-  }
-
-  def getArrayChildren(dimName: String): String = {
-    metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
-      childDim.getDataType.toString.toLowerCase match {
-        case "array" => s"array<${ getArrayChildren(childDim.getColName) }>"
-        case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>"
-        case dType => addDecimalScaleAndPrecision(childDim, dType)
-      }
-    }).mkString(",")
-  }
-
-  def getStructChildren(dimName: String): String = {
-    metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
-      childDim.getDataType.toString.toLowerCase match {
-        case "array" => s"${
-          childDim.getColName.substring(dimName.length + 1)
-        }:array<${ getArrayChildren(childDim.getColName) }>"
-        case "struct" => s"${
-          childDim.getColName.substring(dimName.length + 1)
-        }:struct<${ metaData.carbonTable.getChildren(childDim.getColName)
-          .asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) }").mkString(",")
-        }>"
-        case dType => s"${ childDim.getColName
-          .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
-      }
-    }).mkString(",")
-  }
-
-  override def newInstance(): LogicalPlan = {
-    CarbonRelation(databaseName, tableName, metaData, tableMeta, alias)
-      .asInstanceOf[this.type]
-  }
-
-  val dimensionsAttr = {
-    val sett = new LinkedHashSet(
-      tableMeta.carbonTable.getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName)
-        .asScala.asJava)
-    sett.asScala.toSeq.map(dim => {
-      val dimval = metaData.carbonTable
-        .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
-      val output: DataType = dimval.getDataType
-        .toString.toLowerCase match {
-        case "array" =>
-          CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
-        case "struct" =>
-          CarbonMetastoreTypes.toDataType(s"struct<${ getStructChildren(dim.getColName) }>")
-        case dType =>
-          val dataType = addDecimalScaleAndPrecision(dimval, dType)
-          CarbonMetastoreTypes.toDataType(dataType)
-      }
-
-      AttributeReference(
-        dim.getColName,
-        output,
-        nullable = true)()
-    })
-  }
-
-  val measureAttr = {
-    val factTable = tableMeta.carbonTable.getFactTableName
-    new LinkedHashSet(
-      tableMeta.carbonTable.
-        getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
-        asScala.asJava).asScala.toSeq
-      .map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType(
-        metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.toString
-          .toLowerCase match {
-          case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
-          case others => others
-        }),
-        nullable = true)())
-  }
-
-  override val output = {
-    val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName)
-      .asScala
-    // convert each column to Attribute
-    columns.filter(!_.isInvisible).map { column =>
-      if (column.isDimension()) {
-        val output: DataType = column.getDataType.toString.toLowerCase match {
-          case "array" =>
-            CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>")
-          case "struct" =>
-            CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>")
-          case dType =>
-            val dataType = addDecimalScaleAndPrecision(column, dType)
-            CarbonMetastoreTypes.toDataType(dataType)
-        }
-        AttributeReference(column.getColName, output, nullable = true )(
-          qualifier = Option(tableName + "." + column.getColName))
-      } else {
-        val output = CarbonMetastoreTypes.toDataType {
-          column.getDataType.toString
-            .toLowerCase match {
-            case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column
-              .getColumnSchema.getScale + ")"
-            case others => others
-          }
-        }
-        AttributeReference(column.getColName, output, nullable = true)(
-          qualifier = Option(tableName + "." + column.getColName))
-      }
-    }
-  }
-
-  def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = {
-    var dType = dataType
-    if (dimval.getDataType == DECIMAL) {
-      dType +=
-      "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
-    }
-    dType
-  }
-
-  // TODO: Use data from the footers.
-  override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
-
-  override def equals(other: Any): Boolean = {
-    other match {
-      case p: CarbonRelation =>
-        p.databaseName == databaseName && p.output == output && p.tableName == tableName
-      case _ => false
-    }
-  }
-
-  def addDecimalScaleAndPrecision(dimval: CarbonDimension, dataType: String): String = {
-    var dType = dataType
-    if (dimval.getDataType == DECIMAL) {
-      dType +=
-      "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
-    }
-    dType
-  }
-
-  private var tableStatusLastUpdateTime = 0L
-
-  private var sizeInBytesLocalValue = 0L
-
-  def sizeInBytes: Long = {
-    val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
-      tableMeta.carbonTable.getAbsoluteTableIdentifier)
-
-    if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
-      val tablePath = CarbonStorePath.getCarbonTablePath(
-        tableMeta.storePath,
-        tableMeta.carbonTableIdentifier).getPath
-      val fileType = FileFactory.getFileType(tablePath)
-      if(FileFactory.isFileExist(tablePath, fileType)) {
-        tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
-        sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
-      }
-    }
-    sizeInBytesLocalValue
-  }
-
-}