You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/01 07:45:45 UTC
[2/4] carbondata git commit: [CARBONDATA-2025] Unify all path
construction through CarbonTablePath static method
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index d96a051..90a4223 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -49,7 +49,7 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.comparator.Comparator
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -634,7 +634,7 @@ object CommonUtil {
def readLoadMetadataDetails(model: CarbonLoadModel): Unit = {
- val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
+ val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath
val details = SegmentStatusManager.readLoadMetadata(metadataPath)
model.setLoadMetadataDetails(new util.ArrayList[LoadMetadataDetails](details.toList.asJava))
}
@@ -848,20 +848,18 @@ object CommonUtil {
val fileType = FileFactory.getFileType(databaseLocation)
if (FileFactory.isFileExist(databaseLocation, fileType)) {
val file = FileFactory.getCarbonFile(databaseLocation, fileType)
- if (file.isDirectory) {
- val tableFolders = file.listFiles()
- tableFolders.foreach { tableFolder =>
- if (tableFolder.isDirectory) {
- val tablePath = databaseLocation +
- CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
- val identifier =
- AbsoluteTableIdentifier.from(tablePath, dbName, tableFolder.getName)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
- val tableStatusFile = carbonTablePath.getTableStatusFilePath
- if (FileFactory.isFileExist(tableStatusFile, fileType)) {
- val segmentStatusManager = new SegmentStatusManager(identifier)
- val carbonLock = segmentStatusManager.getTableStatusLock
- try {
+ if (file.isDirectory) {
+ val tableFolders = file.listFiles()
+ tableFolders.foreach { tableFolder =>
+ if (tableFolder.isDirectory) {
+ val tablePath = databaseLocation +
+ CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+ val identifier =
+ AbsoluteTableIdentifier.from(tablePath, dbName, tableFolder.getName)
+ val tableStatusFile =
+ CarbonTablePath.getTableStatusFilePath(tablePath)
+ if (FileFactory.isFileExist(tableStatusFile, fileType)) {
+ try {
val carbonTable = CarbonMetadata.getInstance
.getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
DataLoadingUtil.deleteLoadsAndUpdateMetadata(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 5e9f7fe..c8331f2 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -344,8 +344,8 @@ object DataLoadingUtil {
def deleteLoadsAndUpdateMetadata(
isForceDeletion: Boolean,
carbonTable: CarbonTable): Unit = {
- if (isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) {
- val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath)
+ if (isLoadDeletionRequired(carbonTable.getMetadataPath)) {
+ val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val carbonTableStatusLock =
CarbonLockFactory.getCarbonLockObj(
@@ -359,7 +359,7 @@ object DataLoadingUtil {
absoluteTableIdentifier,
isForceDeletion,
details,
- carbonTable.getMetaDataFilepath
+ carbonTable.getMetadataPath
)
var updationCompletionStaus = false
@@ -372,7 +372,7 @@ object DataLoadingUtil {
// read latest table status again.
val latestMetadata = SegmentStatusManager
- .readLoadMetadata(carbonTable.getMetaDataFilepath)
+ .readLoadMetadata(carbonTable.getMetadataPath)
// update the metadata details from old to new status.
val latestStatus = CarbonLoaderUtil
@@ -397,7 +397,7 @@ object DataLoadingUtil {
if (updationCompletionStaus) {
DeleteLoadFolders
.physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier,
- carbonTable.getMetaDataFilepath, isForceDeletion)
+ carbonTable.getMetadataPath, isForceDeletion)
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 5f44e43..bbf345c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -52,7 +52,7 @@ import org.apache.carbondata.core.reader.CarbonDictionaryReader
import org.apache.carbondata.core.service.CarbonCommonFactory
import org.apache.carbondata.core.statusmanager.SegmentStatus
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.CarbonDictionaryWriter
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
@@ -308,7 +308,7 @@ object GlobalDictionaryUtil {
}
val primDimensions = primDimensionsBuffer.map { x => x }.toArray
val dictDetail = CarbonSparkFactory.getDictionaryDetailService.
- getDictionaryDetail(dictFolderPath, primDimensions, table, carbonLoadModel.getTablePath)
+ getDictionaryDetail(dictFolderPath, primDimensions, carbonLoadModel.getTablePath)
val dictFilePaths = dictDetail.dictFilePaths
val dictFileExists = dictDetail.dictFileExists
val columnIdentifier = dictDetail.columnIdentifiers
@@ -398,10 +398,6 @@ object GlobalDictionaryUtil {
}
}
- // Hack for spark2 integration
- var updateTableMetadataFunc: (CarbonLoadModel, SQLContext, DictionaryLoadModel,
- Array[CarbonDimension]) => Unit = _
-
/**
* check whether global dictionary have been generated successfully or not
*
@@ -705,10 +701,7 @@ object GlobalDictionaryUtil {
try {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
- // create dictionary folder if not exists
- val tablePath = carbonLoadModel.getTablePath
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
- val dictfolderPath = carbonTablePath.getMetadataDirectoryPath
+ val dictfolderPath = CarbonTablePath.getMetadataPath(carbonLoadModel.getTablePath)
// columns which need to generate global dictionary file
val dimensions = carbonTable.getDimensionByTableName(
carbonTable.getTableName).asScala.toArray
@@ -845,12 +838,11 @@ object GlobalDictionaryUtil {
* This method will write dictionary file, sortindex file and dictionary meta for new dictionary
* column with default value
*
- * @param carbonTablePath
* @param columnSchema
* @param absoluteTableIdentifier
* @param defaultValue
*/
- def loadDefaultDictionaryValueForNewColumn(carbonTablePath: CarbonTablePath,
+ def loadDefaultDictionaryValueForNewColumn(
columnSchema: ColumnSchema,
absoluteTableIdentifier: AbsoluteTableIdentifier,
defaultValue: String): Unit = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 9a0098e..3b588df 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.PartitionMapFileStore.PartitionMapper
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
import org.apache.carbondata.core.metadata.encoder.Encoding
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.metadata.schema._
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema}
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation}
import org.apache.carbondata.core.service.CarbonCommonFactory
+import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.DataTypeUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -207,8 +208,7 @@ class AlterTableColumnSchemaGenerator(
alterTableModel: AlterTableAddColumnsModel,
dbName: String,
tableInfo: TableInfo,
- carbonTablePath: CarbonTablePath,
- tableIdentifier: CarbonTableIdentifier,
+ tableIdentifier: AbsoluteTableIdentifier,
sc: SparkContext) {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -369,7 +369,7 @@ object TableNewProcessor {
encoders.add(Encoding.DIRECT_DICTIONARY)
}
columnSchema.setEncodingList(encoders)
- val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
+ val colUniqueIdGenerator = ColumnUniqueIdGenerator.getInstance
val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema)
columnSchema.setColumnUniqueId(columnUniqueId)
columnSchema.setColumnReferenceId(columnUniqueId)
@@ -433,7 +433,7 @@ class TableNewProcessor(cm: TableModel) {
}
}
columnSchema.setEncodingList(encoders)
- val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
+ val colUniqueIdGenerator = ColumnUniqueIdGenerator.getInstance
val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema)
columnSchema.setColumnUniqueId(columnUniqueId)
columnSchema.setColumnReferenceId(columnUniqueId)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 3c871db..1656efa 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -176,8 +176,6 @@ object PartitionUtils {
getPartitionBlockList(identifier, segmentId, partitionIds, oldPartitionIds,
partitionInfo, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable).asScala
val pathList: util.List[String] = new util.ArrayList[String]()
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
- val carbonTablePath = new CarbonTablePath(carbonTableIdentifier, tablePath)
tableBlockInfoList.foreach{ tableBlockInfo =>
val path = tableBlockInfo.getFilePath
val timestamp = CarbonTablePath.DataFileUtil.getTimeStampFromFileName(path)
@@ -190,8 +188,8 @@ object PartitionUtils {
val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo)
val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo)
val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path)
- val indexFilePath = carbonTablePath.getCarbonIndexFilePath(
- String.valueOf(taskId), segmentId, batchNo, String.valueOf(bucketNumber),
+ val indexFilePath = CarbonTablePath.getCarbonIndexFilePath(
+ tablePath, String.valueOf(taskId), segmentId, batchNo, String.valueOf(bucketNumber),
timestamp, version)
// indexFilePath could be duplicated when multiple data file related to one index file
if (indexFilePath != null && !pathList.contains(indexFilePath)) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index 5f8f389..adf33ff 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.OperationContext
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
@@ -70,7 +70,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
sqlContext.sparkSession, loadCommand.logicalPlan.get))
loadCommand.processData(sqlContext.sparkSession)
val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata(
- carbonTable.getMetaDataFilepath)
+ carbonTable.getMetadataPath)
val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect {
case load if loadMetaDataDetails.contains(load) =>
load.setMergedLoadName(mergedLoadName)
@@ -79,12 +79,9 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
load
case other => other
}
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- .getAbsoluteTableIdentifier)
- SegmentStatusManager
- .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath,
- updatedLoadMetaDataDetails)
+ SegmentStatusManager.writeLoadDetailsIntoFile(
+ CarbonTablePath.getTableStatusFilePath(carbonLoadModel.getTablePath),
+ updatedLoadMetaDataDetails)
carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava)
} finally {
// check if any other segments needs compaction on in case of MINOR_COMPACTION.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 655e5a0..f47c9bc 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -57,7 +57,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.FailureCauses
@@ -70,7 +70,7 @@ import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonData
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _}
import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, Util}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
/**
* This is the factory class which can create different RDD depends on user needs.
@@ -137,7 +137,7 @@ object CarbonDataRDDFactory {
LOGGER.error("Not able to acquire the compaction lock for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
CarbonCompactionUtil
- .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
+ .createCompactionRequiredFile(carbonTable.getMetadataPath, compactionType)
// throw exception only in case of DDL trigger.
if (compactionModel.isDDLTrigger) {
CarbonException.analysisException(
@@ -205,7 +205,7 @@ object CarbonDataRDDFactory {
s"${ tableForCompaction.getDatabaseName }." +
s"${ tableForCompaction.getTableName}")
val table: CarbonTable = tableForCompaction
- val metadataPath = table.getMetaDataFilepath
+ val metadataPath = table.getMetadataPath
val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
val newCarbonLoadModel = prepareCarbonLoadModel(table)
@@ -596,15 +596,13 @@ object CarbonDataRDDFactory {
(row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)))
val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
- carbonTable.getMetaDataFilepath)
+ carbonTable.getMetadataPath)
.filter(lmd => lmd.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS) ||
lmd.getSegmentStatus.equals(SegmentStatus.SUCCESS))
val segmentIds = loadMetadataDetails.map(_.getLoadName)
val segmentIdIndex = segmentIds.zipWithIndex.toMap
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getTablePath,
- carbonTable.getCarbonTableIdentifier)
val segmentId2maxTaskNo = segmentIds.map { segId =>
- (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath))
+ (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonLoadModel.getTablePath))
}.toMap
class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index a0c8f65..ddc8586 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -156,7 +156,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
}
val carbonMergerMapping = CarbonMergerMapping(
tablePath,
- carbonTable.getMetaDataFilepath,
+ carbonTable.getMetadataPath,
mergedLoadName,
databaseName,
factTableName,
@@ -169,7 +169,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
partitionMapper)
carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
carbonLoadModel.setLoadMetadataDetails(
- SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath).toList.asJava)
// trigger event for compaction
val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
AlterTableCompactionPreEvent(sqlContext.sparkSession,
@@ -240,10 +240,10 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
((compactionType == CompactionType.IUD_UPDDEL_DELTA) &&
CarbonDataMergerUtil
.updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
- carbonTable.getMetaDataFilepath,
+ carbonTable.getMetadataPath,
carbonLoadModel)) ||
CarbonDataMergerUtil
- .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
+ .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetadataPath,
mergedLoadNumber, carbonLoadModel, compactionType)
if (!statusFileUpdation) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index fb0f9fe..febb83e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -37,8 +37,9 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
@@ -237,7 +238,7 @@ case class CarbonAlterTableCompactionCommand(
readFileFooterFromCarbonDataFile = true)
val carbonMergerMapping = CarbonMergerMapping(carbonTable.getTablePath,
- carbonTable.getMetaDataFilepath,
+ carbonTable.getMetadataPath,
"",
carbonTable.getDatabaseName,
carbonTable.getTableName,
@@ -312,9 +313,10 @@ case class CarbonAlterTableCompactionCommand(
true)(sparkSession,
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
// 5. remove checkpoint
- val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- FileFactory.deleteAllFilesOfDir(new File(tablePath.getStreamingCheckpointDir))
- FileFactory.deleteAllFilesOfDir(new File(tablePath.getStreamingLogDir))
+ FileFactory.deleteAllFilesOfDir(
+ new File(CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)))
+ FileFactory.deleteAllFilesOfDir(
+ new File(CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)))
} else {
val msg = "Failed to close streaming table, because streaming is locked for table " +
carbonTable.getDatabaseName() + "." + carbonTable.getTableName()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 226a625..c7b59d4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -59,7 +59,7 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.events.exception.PreEventException
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
@@ -67,8 +67,8 @@ import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
-import org.apache.carbondata.processing.loading.exception.{NoRetryException}
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModel}
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
@@ -217,8 +217,7 @@ case class CarbonLoadDataCommand(
carbonLoadModel.setUseOnePass(false)
}
// Create table and metadata folders if not exist
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
- val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+ val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
val fileType = FileFactory.getFileType(metadataDirectoryPath)
if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
FileFactory.mkdirs(metadataDirectoryPath, fileType)
@@ -309,9 +308,7 @@ case class CarbonLoadDataCommand(
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
.getCarbonTableIdentifier
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier)
- val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
+ val dictFolderPath = CarbonTablePath.getMetadataPath(carbonLoadModel.getTablePath)
val dimensions = carbonTable.getDimensionByTableName(
carbonTable.getTableName).asScala.toArray
val colDictFilePath = carbonLoadModel.getColDictFilePath
@@ -880,4 +877,5 @@ case class CarbonLoadDataCommand(
val dataFrameWithTupleId = dataFrame.get.select(fieldWithTupleId: _*)
(dataFrameWithTupleId)
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index f8f215f..1e5885e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -45,7 +45,7 @@ case class CarbonShowLoadsCommand(
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
CarbonStore.showSegments(
limit,
- carbonTable.getMetaDataFilepath
+ carbonTable.getMetadataPath
)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 2983ea4..90ff3b4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -35,7 +35,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
import org.apache.carbondata.hadoop.util.SchemaReader
@@ -62,19 +62,18 @@ case class RefreshCarbonTableCommand(
// 2.2.1 validate that all the aggregate tables are copied at the store location.
// 2.2.2 Register the aggregate tables
val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
- val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName)
+ val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName)
// 2.1 check if the table already register with hive then ignore and continue with the next
// schema
if (!sparkSession.sessionState.catalog.listTables(databaseName)
.exists(_.table.equalsIgnoreCase(tableName))) {
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
// check the existence of the schema file to know its a carbon table
- val schemaFilePath = carbonTablePath.getSchemaFilePath
+ val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
// if schema file does not exist then the table will either non carbon table or stale
// carbon table
if (FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))) {
// read TableInfo
- val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier)
+ val tableInfo = SchemaReader.getTableInfo(identifier)
// 2.2 register the table with the hive check if the table being registered has
// aggregate table then do the below steps
// 2.2.1 validate that all the aggregate tables are copied at the store location.
@@ -98,7 +97,7 @@ case class RefreshCarbonTableCommand(
// Register partitions to hive metastore in case of hive partitioning carbon table
if (tableInfo.getFactTable.getPartitionInfo != null &&
tableInfo.getFactTable.getPartitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
- registerAllPartitionsToHive(absoluteTableIdentifier, sparkSession)
+ registerAllPartitionsToHive(identifier, sparkSession)
}
} else {
LOGGER.audit(
@@ -177,9 +176,7 @@ case class RefreshCarbonTableCommand(
dataMapSchemaList.asScala.foreach(dataMap => {
val tableName = dataMap.getChildSchema.getTableName
val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath,
- new CarbonTableIdentifier(dbName, tableName, dataMap.getChildSchema.getTableId))
- val schemaFilePath = carbonTablePath.getSchemaFilePath
+ val schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath)
try {
fileExist = FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))
} catch {
@@ -190,7 +187,7 @@ case class RefreshCarbonTableCommand(
return fileExist;
}
})
- return true
+ true
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index a8efb84..58456b7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.processing.exception.MultipleMatchingException
@@ -67,8 +67,7 @@ object DeleteExecution {
val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
- val factPath = carbonTablePath.getFactDir
+ val factPath = CarbonTablePath.getFactDir(carbonTable.getTablePath)
var segmentsTobeDeleted = Seq.empty[String]
val deleteRdd = if (isUpdateOperation) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 114c25d..b53c609 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -39,7 +39,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.spark.partition.DropPartitionCallable
@@ -69,8 +69,8 @@ case class CarbonAlterTableDropPartitionCommand(
LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
sys.error(s"Alter table failed. table not found: $dbName.$tableName")
}
- val table = relation.carbonTable
- val partitionInfo = table.getPartitionInfo(tableName)
+ val carbonTable = relation.carbonTable
+ val partitionInfo = carbonTable.getPartitionInfo(tableName)
if (partitionInfo == null) {
sys.error(s"Table $tableName is not a partition table.")
}
@@ -95,11 +95,9 @@ case class CarbonAlterTableDropPartitionCommand(
sys.error(s"Dropping range interval partition isn't support yet!")
}
partitionInfo.dropPartition(partitionIndex)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
- // read TableInfo
- val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ // read TableInfo
+ val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
dbName, tableName, tablePath)
@@ -112,11 +110,11 @@ case class CarbonAlterTableDropPartitionCommand(
thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
.setTime_stamp(System.currentTimeMillis)
carbonMetaStore.updateTableSchemaForAlter(
- table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
thriftTable,
null,
- table.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+ carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
// update the schema modified time
carbonMetaStore.updateAndTouchSchemasUpdatedTime()
// sparkSession.catalog.refreshTable(tableName)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index bafc96a..84779cc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.spark.partition.SplitPartitionCallable
@@ -72,8 +72,8 @@ case class CarbonAlterTableSplitPartitionCommand(
LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
sys.error(s"Alter table failed. table not found: $dbName.$tableName")
}
- val table = relation.carbonTable
- val partitionInfo = table.getPartitionInfo(tableName)
+ val carbonTable = relation.carbonTable
+ val partitionInfo = carbonTable.getPartitionInfo(tableName)
val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
// keep a copy of partitionIdList before update partitionInfo.
// will be used in partition data scan
@@ -88,10 +88,8 @@ case class CarbonAlterTableSplitPartitionCommand(
updatePartitionInfo(partitionInfo, partitionIds)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
// read TableInfo
- val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
dbName, tableName, tablePath)
@@ -101,12 +99,12 @@ case class CarbonAlterTableSplitPartitionCommand(
wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
val thriftTable =
schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
- carbonMetaStore
- .updateTableSchemaForAlter(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- thriftTable,
- null,
- table.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+ carbonMetaStore.updateTableSchemaForAlter(
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ thriftTable,
+ null,
+ carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
// update the schema modified time
carbonMetaStore.updateAndTouchSchemasUpdatedTime()
Seq.empty
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index dbbf90c..fed4235 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -183,7 +183,7 @@ case class CreatePreAggregateTableCommand(
// need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
// table.
DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, parentTable)
- val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath)
+ val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
throw new UnsupportedOperationException(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index dac5d5e..feef7a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -16,11 +16,12 @@
*/
package org.apache.spark.sql.execution.command.preaaggregate
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-import org.apache.spark.sql._
-import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias, MatchCastExpression}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession, _}
+import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
+import org.apache.spark.sql.CarbonExpressions.MatchCastExpression
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSeq, Cast, Expression, ExprId, NamedExpression, ScalaUDF}
@@ -40,7 +41,6 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte
import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.TableInfo
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
@@ -423,9 +423,7 @@ object PreAggregateUtil {
locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
// get the latest carbon table and check for column existence
// read the latest schema file
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(
- carbonTable.getAbsoluteTableIdentifier)
- val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
thriftTableInfo,
@@ -528,8 +526,7 @@ object PreAggregateUtil {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
carbonTable.getTableLastUpdatedTime
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
metastore.revertTableSchemaForPreAggCreationFailure(
carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index f3f01bb..07917d0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -28,7 +28,6 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.events.{AlterTableAddColumnPostEvent, AlterTableAddColumnPreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.TableInfo
import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}
@@ -64,9 +63,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener, operationContext)
// get the latest carbon table and check for column existence
// read the latest schema file
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
@@ -76,8 +73,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
dbName,
wrapperTableInfo,
- carbonTablePath,
- carbonTable.getCarbonTableIdentifier,
+ carbonTable.getAbsoluteTableIdentifier,
sparkSession.sparkContext).process
// generate dictionary files for the newly added columns
new AlterTableAddColumnRDD(sparkSession.sparkContext,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index 9bea935..fa8003e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -27,7 +27,7 @@ import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
@@ -74,9 +74,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
sys.error(s"Invalid Column: $columnName")
}
// read the latest schema file
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
// maintain the added column for schema evolution history
var addColumnSchema: ColumnSchema = null
var deletedColumnSchema: ColumnSchema = null
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 0319d9e..d848eb5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -29,7 +29,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{AlterTableDropColumnPostEvent, AlterTableDropColumnPreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
@@ -98,10 +98,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPreEvent, operationContext)
// read the latest schema file
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val tableInfo: org.apache.carbondata.format.TableInfo =
- metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ metastore.getThriftTableInfo(carbonTable)(sparkSession)
// maintain the deleted columns for schema evolution history
var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
val columnSchemaList = tableInfo.fact_table.table_columns.asScala
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index dd34f08..7a56dbf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -34,7 +34,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -98,8 +98,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
// get the latest carbon table and check for column existence
- val oldTablePath = CarbonStorePath.getCarbonTablePath(oldTableIdentifier)
- val tableMetadataFile = oldTablePath.getPath
+ val tableMetadataFile = oldTableIdentifier.getTablePath
val operationContext = new OperationContext
// TODO: Pass new Table Path in pre-event.
val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent(
@@ -109,7 +108,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
sparkSession)
OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
val tableInfo: org.apache.carbondata.format.TableInfo =
- metastore.getThriftTableInfo(oldTablePath)(sparkSession)
+ metastore.getThriftTableInfo(carbonTable)(sparkSession)
val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
schemaEvolutionEntry.setTableName(newTableName)
timeStamp = System.currentTimeMillis()
@@ -118,7 +117,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
val fileType = FileFactory.getFileType(tableMetadataFile)
val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
- var newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
+ var newTablePath = CarbonTablePath.getNewTablePath(
+ oldTableIdentifier.getTablePath, newTableIdentifier.getTableName)
metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
.getClient()
@@ -133,9 +133,9 @@ private[sql] case class CarbonAlterTableRenameCommand(
// changed the rename order to deal with situation when carbon table and hive table
// will point to the same tablePath
if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
- val rename = FileFactory.getCarbonFile(oldTablePath.getPath, fileType)
- .renameForce(oldTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
- newTableName)
+ val rename = FileFactory.getCarbonFile(oldTableIdentifier.getTablePath, fileType)
+ .renameForce(
+ CarbonTablePath.getNewTablePath(oldTableIdentifier.getTablePath, newTableName))
if (!rename) {
renameBadRecords(newTableName, oldTableName, oldDatabaseName)
sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
@@ -163,13 +163,11 @@ private[sql] case class CarbonAlterTableRenameCommand(
case e: Exception =>
LOGGER.error(e, "Rename table failed: " + e.getMessage)
if (carbonTable != null) {
- AlterTableUtil
- .revertRenameTableChanges(oldTableIdentifier,
- newTableName,
- carbonTable.getTablePath,
- carbonTable.getCarbonTableIdentifier.getTableId,
- timeStamp)(
- sparkSession)
+ AlterTableUtil.revertRenameTableChanges(
+ newTableName,
+ carbonTable,
+ timeStamp)(
+ sparkSession)
renameBadRecords(newTableName, oldTableName, oldDatabaseName)
}
sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index b44dc7e..fd09e48 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -46,7 +46,8 @@ import org.apache.carbondata.core.metadata.schema
import org.apache.carbondata.core.metadata.schema.table
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -209,11 +210,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
val tableName = identifier.getCarbonTableIdentifier.getTableName
val tablePath = identifier.getTablePath
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
- tableName.toLowerCase(), UUID.randomUUID().toString)
- val carbonTablePath =
- CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
val fileType = FileFactory.getFileType(tableMetadataFile)
if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName)
@@ -240,13 +237,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
thriftTableInfo: org.apache.carbondata.format.TableInfo,
schemaEvolutionEntry: SchemaEvolutionEntry,
tablePath: String) (sparkSession: SparkSession): String = {
- val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, oldTableIdentifier)
+ val identifier = AbsoluteTableIdentifier.from(tablePath, oldTableIdentifier)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
if (schemaEvolutionEntry != null) {
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
}
- val oldTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
- val newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
+ val newTablePath = CarbonTablePath.getNewTablePath(
+ identifier.getTablePath, newTableIdentifier.getTableName)
val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
thriftTableInfo,
newTableIdentifier.getDatabaseName,
@@ -341,8 +338,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
private def createSchemaThriftFile(
identifier: AbsoluteTableIdentifier,
thriftTableInfo: TableInfo): String = {
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
+ val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
val fileType = FileFactory.getFileType(schemaMetadataPath)
if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
@@ -356,7 +352,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
thriftWriter.write(thriftTableInfo)
thriftWriter.close()
updateSchemasUpdatedTime(touchSchemaFileSystemTime())
- carbonTablePath.getPath
+ identifier.getTablePath
}
protected def addTableCache(
@@ -431,8 +427,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
(sparkSession: SparkSession) {
val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
- val metadataFilePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
- .getMetadataDirectoryPath
+ val metadataFilePath = CarbonTablePath.getMetadataPath(absoluteTableIdentifier.getTablePath)
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)
if (null != carbonTable) {
// clear driver B-tree and dictionary cache
@@ -528,9 +523,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
metadata.carbonTables
- override def getThriftTableInfo(tablePath: CarbonTablePath)
+ override def getThriftTableInfo(carbonTable: CarbonTable)
(sparkSession: SparkSession): TableInfo = {
- val tableMetadataFile = tablePath.getSchemaFilePath
+ val tableMetadataFile = CarbonTablePath.getSchemaFilePath(carbonTable.getTablePath)
CarbonUtil.readSchemaFile(tableMetadataFile)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 759471b..44f731e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -28,7 +28,7 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetad
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.format
import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.spark.util.CarbonSparkUtil
@@ -96,12 +96,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
Seq()
}
- override def getThriftTableInfo(tablePath: CarbonTablePath)
+ override def getThriftTableInfo(carbonTable: CarbonTable)
(sparkSession: SparkSession): format.TableInfo = {
- val identifier = tablePath.getCarbonTableIdentifier
- val relation = lookupRelation(TableIdentifier(identifier.getTableName,
- Some(identifier.getDatabaseName)))(sparkSession).asInstanceOf[CarbonRelation]
- val carbonTable = relation.metaData.carbonTable
val schemaConverter = new ThriftWrapperSchemaConverterImpl
schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo,
carbonTable.getDatabaseName,
@@ -148,7 +144,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
thriftTableInfo: org.apache.carbondata.format.TableInfo,
carbonTablePath: String)(sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
- updateHiveMetaStoreForAlter(newTableIdentifier,
+ updateHiveMetaStoreForAlter(
+ newTableIdentifier,
oldTableIdentifier,
thriftTableInfo,
carbonTablePath,
@@ -163,7 +160,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
sparkSession: SparkSession,
schemaConverter: ThriftWrapperSchemaConverterImpl) = {
val newTablePath =
- CarbonUtil.getNewTablePath(new Path(oldTablePath), newTableIdentifier.getTableName)
+ CarbonTablePath.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
thriftTableInfo,
newTableIdentifier.getDatabaseName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 93c7c09..0645040 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -143,7 +143,7 @@ trait CarbonMetaStore {
def listAllTables(sparkSession: SparkSession): Seq[CarbonTable]
- def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo
+ def getThriftTableInfo(carbonTable: CarbonTable)(sparkSession: SparkSession): TableInfo
def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index b8608f4..a722838 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -34,7 +34,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
/**
* Represents logical plan for one carbon table
@@ -209,9 +209,7 @@ case class CarbonRelation(
.getValidAndInvalidSegments.getValidSegments.isEmpty) {
sizeInBytesLocalValue = 0L
} else {
- val tablePath = CarbonStorePath.getCarbonTablePath(
- carbonTable.getTablePath,
- carbonTable.getCarbonTableIdentifier).getPath
+ val tablePath = carbonTable.getTablePath
val fileType = FileFactory.getFileType(tablePath)
if (FileFactory.isFileExist(tablePath, fileType)) {
// get the valid segments
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 8ebd5a9..bc36e9c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -36,7 +36,8 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTable
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -176,41 +177,28 @@ object AlterTableUtil {
/**
* This method reverts the changes to the schema if the rename table command fails.
- *
- * @param oldTableIdentifier
- * @param newTableName
- * @param timeStamp
- * @param sparkSession
*/
- def revertRenameTableChanges(oldTableIdentifier: TableIdentifier,
+ def revertRenameTableChanges(
newTableName: String,
- tablePath: String,
- tableId: String,
+ oldCarbonTable: CarbonTable,
timeStamp: Long)
(sparkSession: SparkSession): Unit = {
- val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
- val oldCarbonTableIdentifier = new CarbonTableIdentifier(database,
- oldTableIdentifier.table, tableId)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, oldCarbonTableIdentifier)
+ val tablePath = oldCarbonTable.getTablePath
+ val tableId = oldCarbonTable.getCarbonTableIdentifier.getTableId
+ val oldCarbonTableIdentifier = oldCarbonTable.getCarbonTableIdentifier
+ val database = oldCarbonTable.getDatabaseName
val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId)
- val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newTableName)
+ val newTablePath = CarbonTablePath.getNewTablePath(tablePath, newTableName)
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val fileType = FileFactory.getFileType(tablePath)
if (FileFactory.isFileExist(tablePath, fileType)) {
- val tableInfo = if (metastore.isReadFromHiveMetaStore) {
- // In case of hive metastore we first update the carbonschema inside old table only.
- metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(tablePath,
- new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)))(sparkSession)
- } else {
- metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
- }
+ val tableInfo = metastore.getThriftTableInfo(oldCarbonTable)(sparkSession)
val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime == timeStamp) {
- LOGGER.error(s"Reverting changes for $database.${ oldTableIdentifier.table }")
- FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
- .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
- oldTableIdentifier.table)
+ LOGGER.error(s"Reverting changes for $database.${oldCarbonTable.getTableName}")
+ FileFactory.getCarbonFile(tablePath, fileType)
+ .renameForce(CarbonTablePath.getNewTablePath(tablePath, oldCarbonTable.getTableName))
val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
newTablePath,
newCarbonTableIdentifier)
@@ -233,9 +221,7 @@ object AlterTableUtil {
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
- carbonTable.getCarbonTableIdentifier)
- val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime == timeStamp) {
@@ -260,9 +246,7 @@ object AlterTableUtil {
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
- carbonTable.getCarbonTableIdentifier)
- val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime == timeStamp) {
@@ -293,9 +277,7 @@ object AlterTableUtil {
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
- carbonTable.getCarbonTableIdentifier)
- val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime == timeStamp) {
@@ -344,9 +326,7 @@ object AlterTableUtil {
carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
// get the latest carbon table
// read the latest schema file
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
- carbonTable.getCarbonTableIdentifier)
- val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
thriftTableInfo,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index aadee81..0bdef8a 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -856,9 +856,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
}
def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = {
- val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
- carbonTable.getTablePath)
- val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
+ val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
override def accept(file: CarbonFile): Boolean = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 56c5747..71c5477 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.util.TableOptionConstant
@@ -65,9 +65,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
carbonLoadModel.setCsvHeaderColumns(
CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
// Create table and metadata folders if not exist
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier)
- val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+ val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
val fileType = FileFactory.getFileType(metadataDirectoryPath)
if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
FileFactory.mkdirs(metadataDirectoryPath, fileType)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index f9519f8..a465251 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.test.TestQueryExecutor
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.util.CarbonLoaderUtil
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index a1b39d8..c0e1781 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.util.TableOptionConstant
@@ -179,9 +179,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
carbonLoadModel.setMaxColumns("100")
// Create table and metadata folders if not exist
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier)
- val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+ val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
val fileType = FileFactory.getFileType(metadataDirectoryPath)
if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
FileFactory.mkdirs(metadataDirectoryPath, fileType)