You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/27 12:42:03 UTC
[3/7] carbondata git commit: [CARBONDATA-1284]Implement hive based
schema storage in carbon
[CARBONDATA-1284]Implement hive based schema storage in carbon
This closes #1149
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/042a05a5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/042a05a5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/042a05a5
Branch: refs/heads/master
Commit: 042a05a58cec223086ad55ad48ca27e34c40d135
Parents: 92fe63c
Author: ravipesala <ra...@gmail.com>
Authored: Sat Jul 8 17:14:04 2017 +0530
Committer: Raghunandan S <ca...@gmail.com>
Committed: Thu Jul 27 18:47:52 2017 +0800
----------------------------------------------------------------------
.../execution/command/carbonTableSchema.scala | 30 +-
.../spark/sql/hive/CarbonFileMetastore.scala | 384 ++++++++++---------
2 files changed, 225 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/042a05a5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 46b58c5..1781477 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -233,10 +233,10 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
sparkSession.sql(
s"""CREATE TABLE $dbName.$tbName
- |(${ fields.map(f => f.rawSchema).mkString(",") })
- |USING org.apache.spark.sql.CarbonSource""".stripMargin +
- s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
- s""""$tablePath" $carbonSchemaString) """)
+ |(${ fields.map(f => f.rawSchema).mkString(",") })
+ |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+ s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
+ s""""$tablePath"$carbonSchemaString) """)
} catch {
case e: Exception =>
val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
@@ -268,8 +268,8 @@ case class DeleteLoadsById(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.deleteLoadById(
loadids,
getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -293,8 +293,8 @@ case class DeleteLoadsByLoadDate(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.deleteLoadByDate(
loadDate,
getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -750,11 +750,11 @@ case class LoadTable(
(dataFrame, dataFrame)
}
- GlobalDictionaryUtil.generateGlobalDictionary(
- sparkSession.sqlContext,
- carbonLoadModel,
- relation.tableMeta.storePath,
- dictionaryDataFrame)
+ GlobalDictionaryUtil.generateGlobalDictionary(
+ sparkSession.sqlContext,
+ carbonLoadModel,
+ relation.tableMeta.storePath,
+ dictionaryDataFrame)
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
@@ -847,8 +847,8 @@ case class ShowLoads(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.showSegments(
getDB.getDatabaseName(databaseNameOp, sparkSession),
tableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/042a05a5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 2407054..048681c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -22,23 +22,22 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
import org.apache.carbondata.core.fileoperations.FileWriteOperation
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.table
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -63,7 +62,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
}
}
-class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
+class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore {
@transient
val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -78,7 +77,7 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
System.nanoTime() + ""
}
- val metadata = MetaData(new ArrayBuffer[TableMeta]())
+ lazy val metadata = loadMetadata(storePath, nextQueryId)
/**
@@ -91,22 +90,9 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
override def createCarbonRelation(parameters: Map[String, String],
absIdentifier: AbsoluteTableIdentifier,
sparkSession: SparkSession): CarbonRelation = {
- val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName
- val tableName = absIdentifier.getCarbonTableIdentifier.getTableName
- val tables = getTableFromMetadataCache(database, tableName)
- tables match {
- case Some(t) =>
- CarbonRelation(database, tableName,
- CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
- case None =>
- readCarbonSchema(absIdentifier) match {
- case Some(meta) =>
- CarbonRelation(database, tableName,
- CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
- case None =>
- throw new NoSuchTableException(database, tableName)
- }
- }
+ lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName,
+ Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession)
+ .asInstanceOf[CarbonRelation]
}
def lookupRelation(dbName: Option[String], tableName: String)
@@ -114,21 +100,20 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
}
- override def lookupRelation(tableIdentifier: TableIdentifier)
+ def lookupRelation(tableIdentifier: TableIdentifier)
(sparkSession: SparkSession): LogicalPlan = {
+ checkSchemasModifiedTimeAndReloadTables()
val database = tableIdentifier.database.getOrElse(
- sparkSession.catalog.currentDatabase)
- val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
- case SubqueryAlias(_,
- LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
- _) =>
- carbonDatasourceHadoopRelation.carbonRelation
- case LogicalRelation(
- carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
- carbonDatasourceHadoopRelation.carbonRelation
- case _ => throw new NoSuchTableException(database, tableIdentifier.table)
+ sparkSession.catalog.currentDatabase
+ )
+ val tables = getTableFromMetadata(database, tableIdentifier.table, true)
+ tables match {
+ case Some(t) =>
+ CarbonRelation(database, tableIdentifier.table,
+ CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head)
+ case None =>
+ throw new NoSuchTableException(database, tableIdentifier.table)
}
- relation
}
/**
@@ -138,7 +123,8 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
* @param tableName
* @return
*/
- def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = {
+ def getTableFromMetadata(database: String,
+ tableName: String, readStore: Boolean = false): Option[TableMeta] = {
metadata.tablesMeta
.find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
@@ -150,48 +136,99 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
tableExists(TableIdentifier(table, databaseOp))(sparkSession)
}
- override def tableExists(tableIdentifier: TableIdentifier)
- (sparkSession: SparkSession): Boolean = {
- try {
- lookupRelation(tableIdentifier)(sparkSession)
- } catch {
- case e: Exception =>
- return false
+ def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
+ checkSchemasModifiedTimeAndReloadTables()
+ val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+ val tables = metadata.tablesMeta.filter(
+ c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+ tables.nonEmpty
+ }
+
+ def loadMetadata(metadataPath: String, queryId: String): MetaData = {
+ val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
+ val statistic = new QueryStatistic()
+ // creating zookeeper instance once.
+ // if zookeeper is configured as carbon lock type.
+ val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
+ if (null != zookeeperurl) {
+ CarbonProperties.getInstance
+ .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
+ }
+ if (metadataPath == null) {
+ return null
+ }
+ // if no locktype is configured and store type is HDFS set HDFS lock as default
+ if (null == CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
+ FileType.HDFS == FileFactory.getFileType(metadataPath)) {
+ CarbonProperties.getInstance
+ .addProperty(CarbonCommonConstants.LOCK_TYPE,
+ CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
+ )
+ LOGGER.info("Default lock type HDFSLOCK is configured")
}
- true
+ val fileType = FileFactory.getFileType(metadataPath)
+ val metaDataBuffer = new ArrayBuffer[TableMeta]
+ fillMetaData(metadataPath, fileType, metaDataBuffer)
+ updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
+ statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
+ System.currentTimeMillis())
+ recorder.recordStatisticsForDriver(statistic, queryId)
+ MetaData(metaDataBuffer)
}
- private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
- val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
- val tableName = identifier.getCarbonTableIdentifier.getTableName
- val storePath = identifier.getStorePath
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
- tableName.toLowerCase(), UUID.randomUUID().toString)
- val carbonTablePath =
- CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val fileType = FileFactory.getFileType(tableMetadataFile)
- if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
- val tableUniqueName = dbName + "_" + tableName
- val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
- val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
- val schemaFilePath = CarbonStorePath
- .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
- wrapperTableInfo.setStorePath(storePath)
- wrapperTableInfo
- .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
- val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
- identifier.getStorePath,
- identifier.getTablePath,
- carbonTable)
- metadata.tablesMeta += tableMeta
- Some(tableMeta)
- } else {
- None
+ private def fillMetaData(basePath: String, fileType: FileType,
+ metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
+ val databasePath = basePath // + "/schemas"
+ try {
+ if (FileFactory.isFileExist(databasePath, fileType)) {
+ val file = FileFactory.getCarbonFile(databasePath, fileType)
+ val databaseFolders = file.listFiles()
+
+ databaseFolders.foreach(databaseFolder => {
+ if (databaseFolder.isDirectory) {
+ val dbName = databaseFolder.getName
+ val tableFolders = databaseFolder.listFiles()
+
+ tableFolders.foreach(tableFolder => {
+ if (tableFolder.isDirectory) {
+ val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
+ tableFolder.getName, UUID.randomUUID().toString)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
+ carbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+
+ if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+ val tableName = tableFolder.getName
+ val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
+ val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
+ val schemaFilePath = CarbonStorePath
+ .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+ wrapperTableInfo.setStorePath(storePath)
+ wrapperTableInfo
+ .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+ CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+ metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
+ carbonTable)
+ }
+ }
+ })
+ }
+ })
+ } else {
+ // Create folders and files.
+ FileFactory.mkdirs(databasePath, fileType)
+ }
+ } catch {
+ case s: java.io.FileNotFoundException =>
+ s.printStackTrace()
+ // Create folders and files.
+ FileFactory.mkdirs(databasePath, fileType)
}
}
@@ -201,15 +238,15 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
* @param newTableIdentifier
* @param thriftTableInfo
* @param schemaEvolutionEntry
- * @param tablePath
+ * @param carbonStorePath
* @param sparkSession
*/
def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
schemaEvolutionEntry: SchemaEvolutionEntry,
- tablePath: String) (sparkSession: SparkSession): String = {
- val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ carbonStorePath: String)
+ (sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
if (schemaEvolutionEntry != null) {
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
@@ -218,19 +255,11 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
.fromExternalToWrapperTableInfo(thriftTableInfo,
newTableIdentifier.getDatabaseName,
newTableIdentifier.getTableName,
- absoluteTableIdentifier.getStorePath)
- val identifier =
- new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
- newTableIdentifier.getTableName,
- wrapperTableInfo.getFactTable.getTableId)
- val path = createSchemaThriftFile(wrapperTableInfo,
+ carbonStorePath)
+ createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
- identifier)
- addTableCache(wrapperTableInfo,
- AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath,
newTableIdentifier.getDatabaseName,
- newTableIdentifier.getTableName))
- path
+ newTableIdentifier.getTableName)(sparkSession)
}
/**
@@ -238,27 +267,25 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
*
* @param carbonTableIdentifier
* @param thriftTableInfo
- * @param tablePath
+ * @param carbonStorePath
* @param sparkSession
*/
def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
- tablePath: String)(sparkSession: SparkSession): String = {
- val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ carbonStorePath: String)
+ (sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
carbonTableIdentifier.getDatabaseName,
carbonTableIdentifier.getTableName,
- tableIdentifier.getStorePath)
+ carbonStorePath)
val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
evolutionEntries.remove(evolutionEntries.size() - 1)
- wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
- val path = createSchemaThriftFile(wrapperTableInfo,
+ createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
- tableIdentifier.getCarbonTableIdentifier)
- addTableCache(wrapperTableInfo, tableIdentifier)
- path
+ carbonTableIdentifier.getDatabaseName,
+ carbonTableIdentifier.getTableName)(sparkSession)
}
@@ -269,38 +296,24 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
* Load CarbonTable from wrapper tableInfo
*
*/
- def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) {
+ def createTableFromThrift(
+ tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
+ dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = {
+ if (tableExists(tableName, Some(dbName))(sparkSession)) {
+ sys.error(s"Table [$tableName] already exists under Database [$dbName]")
+ }
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val dbName = tableInfo.getDatabaseName
- val tableName = tableInfo.getFactTable.getTableName
val thriftTableInfo = schemaConverter
.fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
- val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
- tableInfo.setStorePath(identifier.getStorePath)
- createSchemaThriftFile(tableInfo,
+ thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
+ .add(schemaEvolutionEntry)
+ val carbonTablePath = createSchemaThriftFile(tableInfo,
thriftTableInfo,
- identifier.getCarbonTableIdentifier)
+ dbName,
+ tableName)(sparkSession)
LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
- }
-
- /**
- * Generates schema string from TableInfo
- */
- override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
- tablePath: String): String = {
- val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
- val schemaMetadataPath =
- CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
- tableInfo.setMetaDataFilepath(schemaMetadataPath)
- tableInfo.setStorePath(tableIdentifier.getStorePath)
- val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
- schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
- tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
- removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName)
- CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- addTableCache(tableInfo, tableIdentifier)
- CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
+ (carbonTablePath, "")
}
/**
@@ -308,16 +321,23 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
*
* @param tableInfo
* @param thriftTableInfo
+ * @param dbName
+ * @param tableName
+ * @param sparkSession
* @return
*/
- private def createSchemaThriftFile(tableInfo: schema.table.TableInfo,
- thriftTableInfo: TableInfo,
- carbonTableIdentifier: CarbonTableIdentifier): String = {
- val carbonTablePath = CarbonStorePath.
- getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier)
+ private def createSchemaThriftFile(
+ tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo,
+ dbName: String, tableName: String)
+ (sparkSession: SparkSession): String = {
+ val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
+ tableInfo.getFactTable.getTableId)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
val schemaFilePath = carbonTablePath.getSchemaFilePath
val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
tableInfo.setMetaDataFilepath(schemaMetadataPath)
+ tableInfo.setStorePath(storePath)
val fileType = FileFactory.getFileType(schemaMetadataPath)
if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
FileFactory.mkdirs(schemaMetadataPath, fileType)
@@ -326,20 +346,13 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
thriftWriter.open(FileWriteOperation.OVERWRITE)
thriftWriter.write(thriftTableInfo)
thriftWriter.close()
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath))
- carbonTablePath.getPath
- }
-
- protected def addTableCache(tableInfo: table.TableInfo,
- absoluteTableIdentifier: AbsoluteTableIdentifier) = {
- val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
- CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
- removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
+ removeTableFromMetadata(dbName, tableName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath,
- absoluteTableIdentifier.getTablePath,
- CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
+ val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
+ CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
metadata.tablesMeta += tableMeta
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+ carbonTablePath.getPath
}
/**
@@ -349,15 +362,13 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
* @param tableName
*/
def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
- val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
+ val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
metadataToBeRemoved match {
case Some(tableMeta) =>
metadata.tablesMeta -= tableMeta
CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
case None =>
- if (LOGGER.isDebugEnabled) {
- LOGGER.debug(s"No entry for table $tableName in database $dbName")
- }
+ LOGGER.debug(s"No entry for table $tableName in database $dbName")
}
}
@@ -391,23 +402,23 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
- try {
- val tablePath = lookupRelation(tableIdentifier)(sparkSession).
- asInstanceOf[CarbonRelation].tableMeta.tablePath
- val fileType = FileFactory.getFileType(tablePath)
- FileFactory.isFileExist(tablePath, fileType)
- } catch {
- case e: Exception =>
- false
- }
+ val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+ val tableName = tableIdentifier.table.toLowerCase
+
+ val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
+ new CarbonTableIdentifier(dbName, tableName, "")).getPath
+
+ val fileType = FileFactory.getFileType(tablePath)
+ FileFactory.isFileExist(tablePath, fileType)
}
- def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
+ def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
(sparkSession: SparkSession) {
val dbName = tableIdentifier.database.get
val tableName = tableIdentifier.table
- val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
- val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath
+
+ val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
+ new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
if (null != carbonTable) {
// clear driver B-tree and dictionary cache
@@ -418,18 +429,27 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
if (FileFactory.isFileExist(metadataFilePath, fileType)) {
// while drop we should refresh the schema modified time so that if any thing has changed
// in the other beeline need to update.
- checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
-
- removeTableFromMetadata(dbName, tableName)
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
+ checkSchemasModifiedTimeAndReloadTables
+ val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+ CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+ val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
+ tableIdentifier.table)
+ metadataToBeRemoved match {
+ case Some(tableMeta) =>
+ metadata.tablesMeta -= tableMeta
+ CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+ case None =>
+ LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName")
+ }
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
}
}
- private def getTimestampFileAndType(basePath: String) = {
- val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+ private def getTimestampFileAndType(databaseName: String, tableName: String) = {
+ val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
val timestampFileType = FileFactory.getFileType(timestampFile)
(timestampFile, timestampFileType)
}
@@ -443,20 +463,37 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
tableModifiedTimeStore.put("default", timeStamp)
}
- def updateAndTouchSchemasUpdatedTime(basePath: String) {
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath))
+ def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) {
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName))
}
+ /**
+ * This method will read the timestamp of empty schema file
+ *
+ * @param databaseName
+ * @param tableName
+ * @return
+ */
+ private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+ if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+ FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
+ } else {
+ System.currentTimeMillis()
+ }
+ }
/**
* This method will check and create an empty schema timestamp file
*
+ * @param databaseName
+ * @param tableName
* @return
*/
- private def touchSchemaFileSystemTime(basePath: String): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath)
+ private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
- LOGGER.audit(s"Creating timestamp file for $basePath")
+ LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
FileFactory.createNewFile(timestampFile, timestampFileType)
}
val systemTime = System.currentTimeMillis()
@@ -465,9 +502,8 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
systemTime
}
- def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
- val (timestampFile, timestampFileType) =
- getTimestampFileAndType(storePath)
+ def checkSchemasModifiedTimeAndReloadTables() {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
getLastModifiedTime ==
@@ -478,7 +514,7 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
}
private def refreshCache() {
- metadata.tablesMeta.clear()
+ metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta
}
override def isReadFromHiveMetaStore: Boolean = false