You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/14 17:50:35 UTC
[13/18] carbondata git commit: [CARBONDATA-1573] [Integration]
Support Database Location Configuration while Creating Database/ Support
Creation of carbon Table in the database location
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 2671aad..c7db436 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -70,7 +70,6 @@ case class CarbonDictionaryDecoder(
override def doExecute(): RDD[InternalRow] = {
attachTree(this, "execute") {
- val storePath = CarbonEnv.getInstance(sparkSession).storePath
val absoluteTableIdentifiers = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
(carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
@@ -81,7 +80,7 @@ case class CarbonDictionaryDecoder(
child.execute().mapPartitions { iter =>
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
- cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
+ cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
forwardDictionaryCache)
val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
@@ -124,7 +123,6 @@ case class CarbonDictionaryDecoder(
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
- val storePath = CarbonEnv.getInstance(sparkSession).storePath
val absoluteTableIdentifiers = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
(carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
@@ -133,9 +131,9 @@ case class CarbonDictionaryDecoder(
if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
- cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
+ cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryWrapper(absoluteTableIdentifiers,
- forwardDictionaryCache, storePath)
+ forwardDictionaryCache)
val exprs = child.output.map { exp =>
ExpressionCanonicalizer.execute(BindReferences.bindReference(exp, child.output))
@@ -252,7 +250,7 @@ case class CarbonDictionaryDecoder(
if (f._2 != null) {
try {
cache.get(new DictionaryColumnUniqueIdentifier(
- atiMap(f._1).getCarbonTableIdentifier,
+ atiMap(f._1),
f._2, f._3.getDataType,
CarbonStorePath.getCarbonTablePath(atiMap(f._1))))
} catch {
@@ -266,33 +264,31 @@ case class CarbonDictionaryDecoder(
}
private def getDictionaryWrapper(atiMap: Map[String, AbsoluteTableIdentifier],
- cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary], storePath: String) = {
+ cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
val allDictIdentifiers = new ArrayBuffer[DictionaryColumnUniqueIdentifier]()
val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryColumnIds.map {
case (tableName, columnIdentifier, carbonDimension) =>
if (columnIdentifier != null) {
try {
- val (newCarbonTableIdentifier, newColumnIdentifier) =
+ val (newAbsoluteTableIdentifier, newColumnIdentifier) =
if (null != carbonDimension.getColumnSchema.getParentColumnTableRelations &&
!carbonDimension
.getColumnSchema.getParentColumnTableRelations.isEmpty) {
- (QueryUtil.getTableIdentifierForColumn(carbonDimension),
+ (QueryUtil.getTableIdentifierForColumn(carbonDimension, atiMap(tableName)),
new ColumnIdentifier(carbonDimension.getColumnSchema
.getParentColumnTableRelations.get(0).getColumnId,
carbonDimension.getColumnProperties,
carbonDimension.getDataType))
} else {
- (atiMap(tableName).getCarbonTableIdentifier, columnIdentifier)
+ (atiMap(tableName), columnIdentifier)
}
val dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(
- newCarbonTableIdentifier,
+ newAbsoluteTableIdentifier,
newColumnIdentifier, carbonDimension.getDataType,
CarbonStorePath
- .getCarbonTablePath(atiMap(tableName).getStorePath, newCarbonTableIdentifier))
+ .getCarbonTablePath(newAbsoluteTableIdentifier))
allDictIdentifiers += dictionaryColumnUniqueIdentifier
- new ForwardDictionaryWrapper(
- storePath,
- dictionaryColumnUniqueIdentifier)
+ new ForwardDictionaryWrapper(dictionaryColumnUniqueIdentifier)
} catch {
case _: Throwable => null
}
@@ -300,7 +296,7 @@ case class CarbonDictionaryDecoder(
null
}
}
- val dictionaryLoader = new DictionaryLoader(storePath, allDictIdentifiers.toList)
+ val dictionaryLoader = new DictionaryLoader(allDictIdentifiers.toList)
dicts.foreach { dict =>
if (dict != null) {
dict.setDictionaryLoader(dictionaryLoader)
@@ -467,7 +463,6 @@ class CarbonDecoderRDD(
aliasMap: CarbonAliasDecoderRelation,
prev: RDD[InternalRow],
output: Seq[Attribute],
- storePath: String,
serializedTableInfo: Array[Byte])
extends CarbonRDDWithTableInfo[InternalRow](prev, serializedTableInfo) {
@@ -516,7 +511,7 @@ class CarbonDecoderRDD(
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
- cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
+ cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
forwardDictionaryCache)
val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
@@ -559,7 +554,7 @@ class CarbonDecoderRDD(
if (f._2 != null) {
try {
cache.get(new DictionaryColumnUniqueIdentifier(
- atiMap(f._1).getCarbonTableIdentifier,
+ atiMap(f._1),
f._2, f._3.getDataType,
CarbonStorePath.getCarbonTablePath(atiMap(f._1))))
} catch {
@@ -578,10 +573,9 @@ class CarbonDecoderRDD(
/**
* It is a wrapper around Dictionary, it is a work around to keep the dictionary serializable in
* case of codegen
- * @param storePath
+ * @param dictIdentifier Dictionary column unique identifier
*/
class ForwardDictionaryWrapper(
- val storePath: String,
dictIdentifier: DictionaryColumnUniqueIdentifier) extends Serializable {
var dictionary: Dictionary = null
@@ -610,8 +604,8 @@ class ForwardDictionaryWrapper(
/**
* It is Dictionary Loader class to load all dictionaries at a time instead of one by one.
*/
-class DictionaryLoader(storePath: String,
- allDictIdentifiers: List[DictionaryColumnUniqueIdentifier]) extends Serializable {
+class DictionaryLoader(allDictIdentifiers: List[DictionaryColumnUniqueIdentifier])
+ extends Serializable {
var isDictionaryLoaded = false
@@ -621,7 +615,7 @@ class DictionaryLoader(storePath: String,
if (!isDictionaryLoaded) {
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
- cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
+ cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
allDicts = forwardDictionaryCache.getAll(allDictIdentifiers.asJava)
isDictionaryLoaded = true
val dictionaryTaskCleaner = TaskContext.get
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index de01c8d..fba590e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -222,10 +222,21 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
// check "tablePath" option
val tablePathOption = parameters.get("tablePath")
+ val dbName: String = parameters.getOrElse("dbName",
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
+ val tableOption: Option[String] = parameters.get("tableName")
+ if (tableOption.isEmpty) {
+ throw new CarbonStreamException("Table creation failed. Table name is not specified")
+ }
+ val tableName = tableOption.get.toLowerCase()
+ if (tableName.contains(" ")) {
+ throw new CarbonStreamException("Table creation failed. Table name cannot contain blank " +
+ "space")
+ }
if (tablePathOption.isDefined) {
val sparkSession = sqlContext.sparkSession
val identifier: AbsoluteTableIdentifier =
- AbsoluteTableIdentifier.fromTablePath(tablePathOption.get)
+ AbsoluteTableIdentifier.from(tablePathOption.get, dbName, tableName)
val carbonTable =
CarbonEnv.getInstance(sparkSession).carbonMetastore.
createCarbonRelation(parameters, identifier, sparkSession).tableMeta.carbonTable
@@ -303,18 +314,20 @@ object CarbonSource {
val tableName: String = properties.getOrElse("tableName", "").toLowerCase
val model = createTableInfoFromParams(properties, dataSchema, dbName, tableName)
val tableInfo: TableInfo = TableNewProcessor(model)
- val tablePath = CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName
+ val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+ CarbonEnv.getInstance(sparkSession).storePath)
+ val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
val schemaEvolutionEntry = new SchemaEvolutionEntry
schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
tableInfo.getFactTable.getSchemaEvalution.
getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
val map = if (metaStore.isReadFromHiveMetaStore) {
- val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
val schemaMetadataPath =
CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
tableInfo.setMetaDataFilepath(schemaMetadataPath)
- tableInfo.setStorePath(tableIdentifier.getStorePath)
+ tableInfo.setTablePath(tableIdentifier.getTablePath)
CarbonUtil.convertToMultiStringMap(tableInfo)
} else {
metaStore.saveToDisk(tableInfo, tablePath)
@@ -322,6 +335,7 @@ object CarbonSource {
}
properties.foreach(e => map.put(e._1, e._2))
map.put("tablePath", tablePath)
+ map.put("dbname", dbName)
map.asScala.toMap
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
index f5c6cba..197b23b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
@@ -39,9 +39,11 @@ case class CarbonCreateTableCommand(
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
val storePath = CarbonEnv.getInstance(sparkSession).storePath
CarbonEnv.getInstance(sparkSession).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(storePath)
+ checkSchemasModifiedTimeAndReloadTables()
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
+ val dbLocation = GetDB.getDatabaseLocation(cm.databaseName, sparkSession, storePath)
+ val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + cm.tableName
val tbName = cm.tableName
val dbName = cm.databaseName
LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
@@ -70,11 +72,10 @@ case class CarbonCreateTableCommand(
sys.error(s"Table [$tbName] already exists under database [$dbName]")
}
} else {
- val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
+ val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tbName)
// Add Database to catalog and persist
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val tablePath = tableIdentifier.getTablePath
- val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
+ val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
if (createDSTable) {
try {
val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
@@ -89,10 +90,9 @@ case class CarbonCreateTableCommand(
s""""$tablePath"$carbonSchemaString) """)
} catch {
case e: Exception =>
- val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
// call the drop table to delete the created table.
CarbonEnv.getInstance(sparkSession).carbonMetastore
- .dropTable(tablePath, identifier)(sparkSession)
+ .dropTable(tableIdentifier)(sparkSession)
LOGGER.audit(s"Table creation with Database name [$dbName] " +
s"and Table name [$tbName] failed")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
index 1bf17b3..0343393 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
@@ -53,14 +54,16 @@ case class CarbonDropTableCommand(
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
val carbonEnv = CarbonEnv.getInstance(sparkSession)
val catalog = carbonEnv.carbonMetastore
- val tableIdentifier =
- AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath,
- dbName.toLowerCase, tableName.toLowerCase)
- catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath)
+ val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+ CarbonEnv.getInstance(sparkSession).storePath)
+ val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
+ val absoluteTableIdentifier = AbsoluteTableIdentifier
+ .from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
+ catalog.checkSchemasModifiedTimeAndReloadTables()
val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
try {
locksToBeAcquired foreach {
- lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
+ lock => carbonLocks += CarbonLockUtil.getLockObject(absoluteTableIdentifier, lock)
}
LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
val carbonTable: Option[CarbonTable] =
@@ -98,7 +101,7 @@ case class CarbonDropTableCommand(
sparkSession)
OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext)
CarbonEnv.getInstance(sparkSession).carbonMetastore
- .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession)
+ .dropTable(absoluteTableIdentifier)(sparkSession)
// fires the event after dropping main table
val dropTablePostEvent: DropTablePostEvent =
@@ -127,8 +130,10 @@ case class CarbonDropTableCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
// delete the table folder
val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
- val tableIdentifier =
- AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName)
+ val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+ CarbonEnv.getInstance(sparkSession).storePath)
+ val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
+ val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
val metadataFilePath =
CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath
val fileType = FileFactory.getFileType(metadataFilePath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index dc3b1ae..66f2756 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
@@ -63,14 +64,16 @@ case class CarbonDropDataMapCommand(
val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
val carbonEnv = CarbonEnv.getInstance(sparkSession)
val catalog = carbonEnv.carbonMetastore
+ val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+ CarbonEnv.getInstance(sparkSession).storePath)
+ val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
val tableIdentifier =
- AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath,
- dbName.toLowerCase, tableName.toLowerCase)
- catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath)
+ AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
+ catalog.checkSchemasModifiedTimeAndReloadTables()
val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
try {
locksToBeAcquired foreach {
- lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
+ lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
}
LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]")
val carbonTable: Option[CarbonTable] =
@@ -140,8 +143,10 @@ case class CarbonDropDataMapCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
// delete the table folder
val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
- val tableIdentifier =
- AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName)
+ val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+ CarbonEnv.getInstance(sparkSession).storePath)
+ val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
+ val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName)
Seq.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index f87e734..947cea1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -71,7 +71,7 @@ case class AlterTableCompactionCommand(
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
+ carbonLoadModel.setTablePath(relation.tableMeta.carbonTable.getTablePath)
var storeLocation = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
@@ -131,7 +131,7 @@ case class AlterTableCompactionCommand(
// Just launch job to merge index and return
CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
carbonLoadModel.getLoadMetadataDetails.asScala.map(_.getLoadName),
- carbonLoadModel.getStorePath,
+ carbonLoadModel.getTablePath,
carbonTable)
return
}
@@ -172,7 +172,7 @@ case class AlterTableCompactionCommand(
} else {
// normal flow of compaction
val lock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.COMPACTION_LOCK
)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
index 1b16b88..8b0dab7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
@@ -36,10 +36,14 @@ case class CleanFilesCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
if (forceTableClean) {
+ val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
+ val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+ CarbonEnv.getInstance(sparkSession).storePath)
+ // TODO: TAABLEPATH
CarbonStore.cleanFiles(
- GetDB.getDatabaseName(databaseNameOp, sparkSession),
+ dbName,
tableName,
- CarbonEnv.getInstance(sparkSession).storePath,
+ databaseLocation,
null,
forceTableClean)
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 3853b5f..777c169 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -192,7 +192,7 @@ case class LoadTableCommand(
} finally {
// Once the data load is successful delete the unwanted partition files
try {
- val partitionLocation = table.getStorePath + "/partition/" +
+ val partitionLocation = relation.tableMeta.storePath + "/partition/" +
table.getDatabaseName + "/" +
table.getFactTableName + "/"
val fileType = FileFactory.getFileType(partitionLocation)
@@ -231,7 +231,7 @@ case class LoadTableCommand(
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
.getCarbonTableIdentifier
val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonLoadModel.getStorePath, carbonTableIdentifier)
+ .getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier)
val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
val dimensions = carbonTable.getDimensionByTableName(
carbonTable.getFactTableName).asScala.toArray
@@ -245,7 +245,7 @@ case class LoadTableCommand(
dimensions,
carbonLoadModel,
sparkSession.sqlContext,
- carbonLoadModel.getStorePath,
+ carbonLoadModel.getTablePath,
dictFolderPath)
}
if (!StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
@@ -253,7 +253,7 @@ case class LoadTableCommand(
GlobalDictionaryUtil
.generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
carbonLoadModel,
- carbonLoadModel.getStorePath,
+ carbonLoadModel.getTablePath,
carbonTableIdentifier,
dictFolderPath,
dimensions,
@@ -289,7 +289,7 @@ case class LoadTableCommand(
}
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
- carbonLoadModel.getStorePath,
+ carbonLoadModel.getTablePath,
columnar,
partitionStatus,
server,
@@ -332,11 +332,11 @@ case class LoadTableCommand(
GlobalDictionaryUtil.generateGlobalDictionary(
sparkSession.sqlContext,
carbonLoadModel,
- carbonLoadModel.getStorePath,
+ carbonLoadModel.getTablePath,
dictionaryDataFrame)
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
- carbonLoadModel.getStorePath,
+ carbonLoadModel.getTablePath,
columnar,
partitionStatus,
None,
@@ -351,8 +351,7 @@ case class LoadTableCommand(
model: DictionaryLoadModel,
noDictDimension: Array[CarbonDimension]): Unit = {
val sparkSession = sqlContext.sparkSession
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
- model.table)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.table)
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
// read TableInfo
@@ -374,11 +373,12 @@ case class LoadTableCommand(
tableInfo, entry, carbonTablePath.getPath)(sparkSession)
// update the schema modified time
- metastore.updateAndTouchSchemasUpdatedTime(model.hdfsLocation)
+ metastore.updateAndTouchSchemasUpdatedTime()
+ val identifier = model.table.getCarbonTableIdentifier
// update CarbonDataLoadSchema
- val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName),
- model.table.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
+ val carbonTable = metastore.lookupRelation(Option(identifier.getDatabaseName),
+ identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
.carbonTable
carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 0c39dd4..a52008a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -76,14 +76,10 @@ object DeleteExecution {
.lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
asInstanceOf[CarbonRelation]
- val storeLocation = relation.tableMeta.storePath
- val absoluteTableIdentifier: AbsoluteTableIdentifier = new
- AbsoluteTableIdentifier(storeLocation,
- relation.tableMeta.carbonTableIdentifier)
- val tablePath = CarbonStorePath.getCarbonTablePath(
- storeLocation,
- absoluteTableIdentifier.getCarbonTableIdentifier)
- val factPath = tablePath.getFactDir
+ val absoluteTableIdentifier = relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
+ val carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(absoluteTableIdentifier)
+ val factPath = carbonTablePath.getFactDir
val carbonTable = relation.tableMeta.carbonTable
var deleteStatus = true
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
index 764deb7..a898822 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
@@ -60,7 +60,7 @@ private[sql] case class ProjectForDeleteCommand(
OperationListenerBus.getInstance.fireEvent(deleteFromTablePreEvent, operationContext)
val metadataLock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.METADATA_LOCK)
var lockStatus = false
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
index e48693b..549c58f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -67,7 +67,7 @@ private[sql] case class ProjectForUpdateCommand(
OperationListenerBus.getInstance.fireEvent(updateTablePreEvent, operationContext)
val metadataLock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.METADATA_LOCK)
var lockStatus = false
// get the current time stamp which should be same for delete and update.
@@ -83,9 +83,7 @@ private[sql] case class ProjectForUpdateCommand(
else {
throw new Exception("Table is locked for updation. Please try after some time")
}
- val tablePath = CarbonStorePath.getCarbonTablePath(
- carbonTable.getStorePath,
- carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
+ val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
// Get RDD.
dataSet = if (isPersistEnabled) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
index 9b16060..acd9bd3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
@@ -66,8 +66,8 @@ case class AlterTableDropCarbonPartitionCommand(
val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
- val storePath = relation.tableMeta.storePath
- carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath)
+ val tablePath = relation.tableMeta.tablePath
+ carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
if (relation == null) {
sys.error(s"Table $dbName.$tableName does not exist")
}
@@ -101,14 +101,14 @@ case class AlterTableDropCarbonPartitionCommand(
sys.error(s"Dropping range interval partition isn't support yet!")
}
partitionInfo.dropPartition(partitionIndex)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
val schemaFilePath = carbonTablePath.getSchemaFilePath
// read TableInfo
val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
- dbName, tableName, storePath)
+ dbName, tableName, tablePath)
val tableSchema = wrapperTableInfo.getFactTable
tableSchema.setPartitionInfo(partitionInfo)
wrapperTableInfo.setFactTable(tableSchema)
@@ -118,10 +118,10 @@ case class AlterTableDropCarbonPartitionCommand(
thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
.setTime_stamp(System.currentTimeMillis)
carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
- dbName, tableName, storePath)
+ dbName, tableName, tablePath)
CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
// update the schema modified time
- carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath)
+ carbonMetaStore.updateAndTouchSchemasUpdatedTime()
// sparkSession.catalog.refreshTable(tableName)
Seq.empty
}
@@ -152,7 +152,7 @@ case class AlterTableDropCarbonPartitionCommand(
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+ carbonLoadModel.setTablePath(relation.tableMeta.tablePath)
val loadStartTime = CarbonUpdateUtil.readCurrentTime
carbonLoadModel.setFactTimeStamp(loadStartTime)
alterTableDropPartition(
@@ -224,7 +224,7 @@ case class AlterTableDropCarbonPartitionCommand(
for (thread <- threadArray) {
thread.join()
}
- val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath,
+ val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
refresher.refreshSegments(validSegments.asJava)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
index c3a918c..0973226 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
@@ -70,11 +70,11 @@ case class AlterTableSplitCarbonPartitionCommand(
val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
- val storePath = relation.tableMeta.storePath
+ val tablePath = relation.tableMeta.tablePath
if (relation == null) {
sys.error(s"Table $dbName.$tableName does not exist")
}
- carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath)
+ carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
sys.error(s"Alter table failed. table not found: $dbName.$tableName")
@@ -95,13 +95,13 @@ case class AlterTableSplitCarbonPartitionCommand(
updatePartitionInfo(partitionInfo, partitionIds)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
val schemaFilePath = carbonTablePath.getSchemaFilePath
// read TableInfo
val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
- dbName, tableName, storePath)
+ dbName, tableName, tablePath)
val tableSchema = wrapperTableInfo.getFactTable
tableSchema.setPartitionInfo(partitionInfo)
wrapperTableInfo.setFactTable(tableSchema)
@@ -109,10 +109,10 @@ case class AlterTableSplitCarbonPartitionCommand(
val thriftTable =
schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
- dbName, tableName, storePath)
+ dbName, tableName, tablePath)
CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
// update the schema modified time
- carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath)
+ carbonMetaStore.updateAndTouchSchemasUpdatedTime()
sparkSession.sessionState.catalog.refreshTable(TableIdentifier(tableName, Option(dbName)))
Seq.empty
}
@@ -153,14 +153,14 @@ case class AlterTableSplitCarbonPartitionCommand(
val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
- val storePath = relation.tableMeta.storePath
+ val tablePath = relation.tableMeta.tablePath
val table = relation.tableMeta.carbonTable
val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
val dataLoadSchema = new CarbonDataLoadSchema(table)
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setStorePath(storePath)
+ carbonLoadModel.setTablePath(tablePath)
val loadStartTime = CarbonUpdateUtil.readCurrentTime
carbonLoadModel.setFactTimeStamp(loadStartTime)
alterTableSplitPartition(
@@ -232,7 +232,7 @@ case class AlterTableSplitCarbonPartitionCommand(
threadArray.foreach {
thread => thread.join()
}
- val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath,
+ val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
refresher.refreshSegments(validSegments.asJava)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index d693061..3193310 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -332,15 +332,15 @@ object PreAggregateUtil {
locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
// get the latest carbon table and check for column existence
// read the latest schema file
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
dbName,
tableName,
- carbonTable.getStorePath)
+ carbonTable.getTablePath)
numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
if (wrapperTableInfo.getDataMapSchemaList.asScala.
exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) {
@@ -399,7 +399,7 @@ object PreAggregateUtil {
val acquiredLocks = ListBuffer[ICarbonLock]()
try {
locksToBeAcquired.foreach { lock =>
- acquiredLocks += CarbonLockUtil.getLockObject(table.getCarbonTableIdentifier, lock)
+ acquiredLocks += CarbonLockUtil.getLockObject(table.getAbsoluteTableIdentifier, lock)
}
acquiredLocks.toList
} catch {
@@ -439,13 +439,12 @@ object PreAggregateUtil {
.lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
.carbonTable
carbonTable.getTableLastUpdatedTime
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
metastore
- .revertTableSchemaForPreAggCreationFailure(carbonTable.getCarbonTableIdentifier,
- thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+ .revertTableSchemaForPreAggCreationFailure(carbonTable.getAbsoluteTableIdentifier,
+ thriftTable)(sparkSession)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 7cc43d2..2132131 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -65,26 +65,25 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener)
// get the latest carbon table and check for column existence
// read the latest schema file
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
dbName,
tableName,
- carbonTable.getStorePath)
+ carbonTable.getTablePath)
newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
dbName,
wrapperTableInfo,
carbonTablePath,
carbonTable.getCarbonTableIdentifier,
- carbonTable.getStorePath, sparkSession.sparkContext).process
+ sparkSession.sparkContext).process
// generate dictionary files for the newly added columns
new AlterTableAddColumnRDD(sparkSession.sparkContext,
newCols,
- carbonTable.getCarbonTableIdentifier,
- carbonTable.getStorePath).collect()
+ carbonTable.getAbsoluteTableIdentifier).collect()
timeStamp = System.currentTimeMillis
val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
schemaEvolutionEntry.setTimeStamp(timeStamp)
@@ -105,8 +104,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
new AlterTableDropColumnRDD(sparkSession.sparkContext,
newCols,
- carbonTable.getCarbonTableIdentifier,
- carbonTable.getStorePath).collect()
+ carbonTable.getAbsoluteTableIdentifier).collect()
AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession)
}
sys.error(s"Alter table add operation failed: ${e.getMessage}")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index 023e061..e44899e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -74,8 +74,8 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
sys.error(s"Invalid Column: $columnName")
}
// read the latest schema file
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
// maintain the added column for schema evolution history
var addColumnSchema: ColumnSchema = null
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 0b737bf..dae2d7b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -110,8 +110,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPreEvent, operationContext)
// read the latest schema file
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val tableInfo: org.apache.carbondata.format.TableInfo =
metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
// maintain the deleted columns for schema evolution history
@@ -138,8 +138,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
// delete dictionary files for dictionary column and clear dictionary cache from memory
new AlterTableDropColumnRDD(sparkSession.sparkContext,
dictionaryColumns,
- carbonTable.getCarbonTableIdentifier,
- carbonTable.getStorePath).collect()
+ carbonTable.getAbsoluteTableIdentifier).collect()
// event will be fired before dropping the columns
val alterTableDropColumnPostEvent: AlterTableDropColumnPostEvent =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index c2e5cf0..f1cce13 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -28,7 +28,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -85,7 +85,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
.asInstanceOf[CarbonRelation].tableMeta
carbonTable = tableMeta.carbonTable
// invalid data map for the old table, see CARBON-1690
- val oldTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath)
+ val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
// get the latest carbon table and check for column existence
val carbonTablePath = CarbonStorePath.getCarbonTablePath(oldTableIdentifier)
@@ -106,6 +106,21 @@ private[sql] case class CarbonAlterTableRenameCommand(
schemaEvolutionEntry.setTime_stamp(timeStamp)
renameBadRecords(oldTableName, newTableName, oldDatabaseName)
val fileType = FileFactory.getFileType(tableMetadataFile)
+ val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
+ newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
+ var newTablePath = CarbonUtil.getNewTablePath(carbonTablePath, newTableIdentifier)
+
+ metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+ .runSqlHive(
+ s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+ .runSqlHive(
+ s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
+ s"('tableName'='$newTableName', " +
+ s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+ // changed the rename order to deal with situation when carbon table and hive table
+ // will point to the same tablePath
if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
.renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
@@ -115,23 +130,12 @@ private[sql] case class CarbonAlterTableRenameCommand(
sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
}
}
- val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
- newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
- val newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier,
+ newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier,
carbonTable.getCarbonTableIdentifier,
tableInfo,
schemaEvolutionEntry,
tableMeta.tablePath)(sparkSession)
- metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
- sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
- .runSqlHive(
- s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
- sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
- .runSqlHive(
- s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
- s"('tableName'='$newTableName', " +
- s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
carbonTable,
alterTableRenameModel,
@@ -150,7 +154,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
AlterTableUtil
.revertRenameTableChanges(oldTableIdentifier,
newTableName,
- carbonTable.getStorePath,
+ carbonTable.getTablePath,
carbonTable.getCarbonTableIdentifier.getTableId,
timeStamp)(
sparkSession)
@@ -167,7 +171,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
locksToBeAcquired,
oldDatabaseName,
newTableName,
- carbonTable.getStorePath)
+ carbonTable.getTablePath)
}
}
Seq.empty
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index c6ca950..c126b25 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -102,7 +102,6 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
CarbonAliasDecoderRelation(),
rdd,
output,
- CarbonEnv.getInstance(SparkSession.getActiveSession.get).storePath,
table.carbonTable.getTableInfo.serialize())
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index e39ba73..ef2e0a5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsComm
import org.apache.spark.sql.execution.command.schema._
import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
-import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
@@ -77,7 +76,6 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
_, child: LogicalPlan, overwrite, _) =>
ExecutedCommandExec(LoadTableByInsertCommand(relation, child, overwrite.enabled)) :: Nil
case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
- CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).storePath)
ExecutedCommandExec(createDb) :: Nil
case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 0343402..6d80a26 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -23,11 +23,10 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -39,12 +38,11 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation
import org.apache.carbondata.core.metadata.{schema, AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.format
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.util.CarbonSparkUtil
@@ -192,11 +190,11 @@ class CarbonFileMetastore extends CarbonMetaStore {
private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
val tableName = identifier.getCarbonTableIdentifier.getTableName
- val storePath = identifier.getStorePath
+ val tablePath = identifier.getTablePath
val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
tableName.toLowerCase(), UUID.randomUUID().toString)
val carbonTablePath =
- CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
val tableMetadataFile = carbonTablePath.getSchemaFilePath
val fileType = FileFactory.getFileType(tableMetadataFile)
if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
@@ -204,16 +202,16 @@ class CarbonFileMetastore extends CarbonMetaStore {
val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+ .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
val schemaFilePath = CarbonStorePath
- .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
- wrapperTableInfo.setStorePath(storePath)
+ .getCarbonTablePath(tablePath, carbonTableIdentifier).getSchemaFilePath
+ wrapperTableInfo.setTablePath(tablePath)
wrapperTableInfo
.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
- identifier.getStorePath,
+ identifier.getTablePath,
identifier.getTablePath,
carbonTable)
metadata.tablesMeta += tableMeta
@@ -237,16 +235,19 @@ class CarbonFileMetastore extends CarbonMetaStore {
thriftTableInfo: org.apache.carbondata.format.TableInfo,
schemaEvolutionEntry: SchemaEvolutionEntry,
tablePath: String) (sparkSession: SparkSession): String = {
- val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ val absoluteTableIdentifier = new AbsoluteTableIdentifier(tablePath, oldTableIdentifier)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
if (schemaEvolutionEntry != null) {
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
}
+ val oldCarbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
+ val newAbsoluteTableIdentifier = new AbsoluteTableIdentifier(CarbonUtil
+ .getNewTablePath(oldCarbonTablePath, newTableIdentifier), newTableIdentifier)
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
newTableIdentifier.getDatabaseName,
newTableIdentifier.getTableName,
- absoluteTableIdentifier.getStorePath)
+ newAbsoluteTableIdentifier.getTablePath)
val identifier =
new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
newTableIdentifier.getTableName,
@@ -254,10 +255,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
val path = createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
identifier)
- addTableCache(wrapperTableInfo,
- AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath,
- newTableIdentifier.getDatabaseName,
- newTableIdentifier.getTableName))
+ addTableCache(wrapperTableInfo, newAbsoluteTableIdentifier)
path
}
@@ -266,47 +264,44 @@ class CarbonFileMetastore extends CarbonMetaStore {
*
* @param carbonTableIdentifier
* @param thriftTableInfo
- * @param tablePath
* @param sparkSession
*/
def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
- tablePath: String)(sparkSession: SparkSession): String = {
- val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ absoluteTableIdentifier: AbsoluteTableIdentifier)(sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
carbonTableIdentifier.getDatabaseName,
carbonTableIdentifier.getTableName,
- tableIdentifier.getStorePath)
+ absoluteTableIdentifier.getTablePath)
val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
evolutionEntries.remove(evolutionEntries.size() - 1)
- wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+ wrapperTableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
val path = createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
- tableIdentifier.getCarbonTableIdentifier)
- addTableCache(wrapperTableInfo, tableIdentifier)
+ absoluteTableIdentifier.getCarbonTableIdentifier)
+ addTableCache(wrapperTableInfo, absoluteTableIdentifier)
path
}
- override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier:
- CarbonTableIdentifier,
- thriftTableInfo: org.apache.carbondata.format.TableInfo,
- tablePath: String)(sparkSession: SparkSession): String = {
- val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ override def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier:
+ AbsoluteTableIdentifier,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo)
+ (sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
- carbonTableIdentifier.getDatabaseName,
- carbonTableIdentifier.getTableName,
- tableIdentifier.getStorePath)
+ absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+ absoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+ absoluteTableIdentifier.getTablePath)
val childSchemaList = wrapperTableInfo.getDataMapSchemaList
childSchemaList.remove(childSchemaList.size() - 1)
- wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+ wrapperTableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
val path = createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
- tableIdentifier.getCarbonTableIdentifier)
- addTableCache(wrapperTableInfo, tableIdentifier)
+ absoluteTableIdentifier.getCarbonTableIdentifier)
+ addTableCache(wrapperTableInfo, absoluteTableIdentifier)
path
}
@@ -323,8 +318,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
val tableName = tableInfo.getFactTable.getTableName
val thriftTableInfo = schemaConverter
.fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
- val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
- tableInfo.setStorePath(identifier.getStorePath)
+ val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
+ tableInfo.setTablePath(identifier.getTablePath)
createSchemaThriftFile(tableInfo,
thriftTableInfo,
identifier.getCarbonTableIdentifier)
@@ -335,19 +330,18 @@ class CarbonFileMetastore extends CarbonMetaStore {
* Generates schema string from TableInfo
*/
override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
- tablePath: String): String = {
- val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+ absoluteTableIdentifier: AbsoluteTableIdentifier): String = {
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
val schemaMetadataPath =
CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
tableInfo.setMetaDataFilepath(schemaMetadataPath)
- tableInfo.setStorePath(tableIdentifier.getStorePath)
+ tableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- addTableCache(tableInfo, tableIdentifier)
+ addTableCache(tableInfo, absoluteTableIdentifier)
CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
}
@@ -362,7 +356,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
thriftTableInfo: TableInfo,
carbonTableIdentifier: CarbonTableIdentifier): String = {
val carbonTablePath = CarbonStorePath.
- getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier)
+ getCarbonTablePath(tableInfo.getTablePath, carbonTableIdentifier)
val schemaFilePath = carbonTablePath.getSchemaFilePath
val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
tableInfo.setMetaDataFilepath(schemaMetadataPath)
@@ -374,7 +368,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
thriftWriter.open(FileWriteOperation.OVERWRITE)
thriftWriter.write(thriftTableInfo)
thriftWriter.close()
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath))
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime())
carbonTablePath.getPath
}
@@ -384,7 +378,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath,
+ val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getTablePath,
absoluteTableIdentifier.getTablePath,
CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
metadata.tablesMeta += tableMeta
@@ -424,16 +418,16 @@ class CarbonFileMetastore extends CarbonMetaStore {
}
def updateMetadataByThriftTable(schemaFilePath: String,
- tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = {
+ tableInfo: TableInfo, dbName: String, tableName: String, tablePath: String): Unit = {
tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
.setTime_stamp(System.currentTimeMillis())
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+ .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
wrapperTableInfo
.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
- wrapperTableInfo.setStorePath(storePath)
+ wrapperTableInfo.setTablePath(tablePath)
updateMetadataByWrapperTable(wrapperTableInfo)
}
@@ -446,17 +440,17 @@ class CarbonFileMetastore extends CarbonMetaStore {
FileFactory.isFileExist(tablePath, fileType)
} catch {
case e: Exception =>
- false
+ false
}
}
- def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
+ def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier)
(sparkSession: SparkSession) {
- val dbName = tableIdentifier.database.get
- val tableName = tableIdentifier.table
- val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
- val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath
+ val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
+ val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
+ val metadataFilePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
+ .getMetadataDirectoryPath
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
if (null != carbonTable) {
// clear driver B-tree and dictionary cache
@@ -467,21 +461,28 @@ class CarbonFileMetastore extends CarbonMetaStore {
if (FileFactory.isFileExist(metadataFilePath, fileType)) {
// while drop we should refresh the schema modified time so that if any thing has changed
// in the other beeline need to update.
- checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+ checkSchemasModifiedTimeAndReloadTables()
removeTableFromMetadata(dbName, tableName)
-
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime())
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
+ val tableIdentifier = TableIdentifier(tableName, Option(dbName))
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
- DataMapStoreManager.getInstance().clearDataMaps(identifier)
+ DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
}
}
- private def getTimestampFileAndType(basePath: String) = {
+ private def getTimestampFileAndType() = {
+ var basePath = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
+ CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT)
+ basePath = CarbonUtil.checkAndAppendFileSystemURIScheme(basePath)
val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
val timestampFileType = FileFactory.getFileType(timestampFile)
+ if (!FileFactory.isFileExist(basePath, timestampFileType)) {
+ FileFactory.mkdirs(basePath, timestampFileType)
+ }
(timestampFile, timestampFileType)
}
@@ -494,8 +495,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
tableModifiedTimeStore.put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, timeStamp)
}
- def updateAndTouchSchemasUpdatedTime(basePath: String) {
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath))
+ def updateAndTouchSchemasUpdatedTime() {
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime())
}
@@ -504,10 +505,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
*
* @return
*/
- private def touchSchemaFileSystemTime(basePath: String): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath)
+ private def touchSchemaFileSystemTime(): Long = {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType()
if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
- LOGGER.audit(s"Creating timestamp file for $basePath")
+ LOGGER.audit(s"Creating timestamp file for $timestampFile")
FileFactory.createNewFile(timestampFile, timestampFileType)
}
FileFactory.getCarbonFile(timestampFile, timestampFileType)
@@ -518,9 +519,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
.getLastModifiedTime
}
- def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
- val (timestampFile, timestampFileType) =
- getTimestampFileAndType(storePath)
+ def checkSchemasModifiedTimeAndReloadTables() {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType()
if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
getLastModifiedTime ==
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index a500f00..dedaf1c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -18,7 +18,8 @@ package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
@@ -33,7 +34,7 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.format
import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.processing.merger.TableMeta
-import org.apache.carbondata.spark.util.CarbonSparkUtil
+import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil}
/**
* Metastore to store carbonschema in hive
@@ -56,7 +57,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
if (info != null) {
val table = CarbonTable.buildFromTableInfo(info)
val meta = new TableMeta(table.getCarbonTableIdentifier,
- absIdentifier.getStorePath, absIdentifier.getTablePath, table)
+ absIdentifier.getTablePath, absIdentifier.getTablePath, table)
CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName,
CarbonSparkUtil.createSparkMeta(table), meta)
} else {
@@ -70,25 +71,25 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
tableExists(tableIdentifier)(sparkSession)
}
- override def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
+ override def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier)
(sparkSession: SparkSession): Unit = {
- val dbName = tableIdentifier.database.get
- val tableName = tableIdentifier.table
- val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
+ val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
if (null != carbonTable) {
// clear driver B-tree and dictionary cache
ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
}
- checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+ checkSchemasModifiedTimeAndReloadTables()
removeTableFromMetadata(dbName, tableName)
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
+ val tableIdentifier = TableIdentifier(tableName, Option(dbName))
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
- DataMapStoreManager.getInstance().clearDataMaps(identifier)
+ DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
}
- override def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
+ override def checkSchemasModifiedTimeAndReloadTables() {
// do nothing now
}
@@ -125,14 +126,13 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
tablePath: String)
(sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
if (schemaEvolutionEntry != null) {
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
}
updateHiveMetaStoreForAlter(newTableIdentifier,
oldTableIdentifier,
thriftTableInfo,
- identifier.getStorePath,
+ tablePath,
sparkSession,
schemaConverter)
}
@@ -142,19 +142,18 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
*
* @param newTableIdentifier
* @param thriftTableInfo
- * @param carbonStorePath
+ * @param carbonTablePath
* @param sparkSession
*/
override def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
- carbonStorePath: String)(sparkSession: SparkSession): String = {
+ carbonTablePath: String)(sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val identifier = AbsoluteTableIdentifier.fromTablePath(carbonStorePath)
updateHiveMetaStoreForDataMap(newTableIdentifier,
oldTableIdentifier,
thriftTableInfo,
- identifier.getStorePath,
+ carbonTablePath,
sparkSession,
schemaConverter)
}
@@ -165,13 +164,14 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
carbonStorePath: String,
sparkSession: SparkSession,
schemaConverter: ThriftWrapperSchemaConverterImpl) = {
+ val tablePath = CarbonUtil.getNewTablePath(new Path(carbonStorePath), newTableIdentifier)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, newTableIdentifier)
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
newTableIdentifier.getDatabaseName,
newTableIdentifier.getTableName,
- carbonStorePath)
- wrapperTableInfo.setStorePath(carbonStorePath)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier)
+ carbonTablePath.toString)
+ wrapperTableInfo.setTablePath(carbonStorePath)
val schemaMetadataPath =
CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
@@ -189,16 +189,17 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
private def updateHiveMetaStoreForDataMap(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: format.TableInfo,
- carbonStorePath: String,
+ tablePath: String,
sparkSession: SparkSession,
schemaConverter: ThriftWrapperSchemaConverterImpl) = {
+ val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newTableIdentifier)
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
newTableIdentifier.getDatabaseName,
newTableIdentifier.getTableName,
- carbonStorePath)
- wrapperTableInfo.setStorePath(carbonStorePath)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier)
+ newTablePath)
+ wrapperTableInfo.setTablePath(newTablePath)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(newTablePath, newTableIdentifier)
val schemaMetadataPath =
CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
@@ -207,7 +208,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
removeTableFromMetadata(dbName, tableName)
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
- CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath
+ carbonTablePath.getPath
}
/**
@@ -219,32 +220,31 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
*/
override def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: format.TableInfo,
- tablePath: String)
+ identifier: AbsoluteTableIdentifier)
(sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
evolutionEntries.remove(evolutionEntries.size() - 1)
updateHiveMetaStoreForAlter(carbonTableIdentifier,
carbonTableIdentifier,
thriftTableInfo,
- identifier.getStorePath,
+ identifier.getTablePath,
sparkSession,
schemaConverter)
}
- override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier:
- CarbonTableIdentifier,
- thriftTableInfo: org.apache.carbondata.format.TableInfo,
- tablePath: String)(sparkSession: SparkSession): String = {
+ override def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier:
+ AbsoluteTableIdentifier,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo)
+ (sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
val childSchemas = thriftTableInfo.dataMapSchemas
childSchemas.remove(childSchemas.size())
+ val carbonTableIdentifier = absoluteTableIdentifier.getCarbonTableIdentifier
updateHiveMetaStoreForAlter(carbonTableIdentifier,
carbonTableIdentifier,
thriftTableInfo,
- identifier.getStorePath,
+ absoluteTableIdentifier.getTablePath,
sparkSession,
schemaConverter)
}