You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/17 06:48:09 UTC
[2/3] carbondata git commit: [CARBONDATA-1739] Clean up store path
interface
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index 822455c..64a066c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -47,9 +47,7 @@ case class CarbonDataMapShowCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
val schemaList = carbonTable.getTableInfo.getDataMapSchemaList
if (schemaList != null && schemaList.size() > 0) {
schemaList.asScala.map { s =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 66f2756..f34afbf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events._
@@ -60,12 +61,11 @@ case class CarbonDropDataMapCommand(
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
val identifier = TableIdentifier(tableName, Option(dbName))
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
val carbonEnv = CarbonEnv.getInstance(sparkSession)
val catalog = carbonEnv.carbonMetastore
val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonEnv.getInstance(sparkSession).storePath)
+ CarbonProperties.getStorePath)
val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
val tableIdentifier =
AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
@@ -76,20 +76,19 @@ case class CarbonDropDataMapCommand(
lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
}
LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]")
- val carbonTable: Option[CarbonTable] =
- catalog.getTableFromMetadataCache(dbName, tableName) match {
- case Some(tableMeta) => Some(tableMeta.carbonTable)
- case None => try {
- Some(catalog.lookupRelation(identifier)(sparkSession)
- .asInstanceOf[CarbonRelation].metaData.carbonTable)
- } catch {
- case ex: NoSuchTableException =>
- if (!ifExistsSet) {
- throw ex
- }
- None
- }
+ var carbonTable: Option[CarbonTable] =
+ catalog.getTableFromMetadataCache(dbName, tableName)
+ if (carbonTable.isEmpty) {
+ try {
+ carbonTable = Some(catalog.lookupRelation(identifier)(sparkSession)
+ .asInstanceOf[CarbonRelation].metaData.carbonTable)
+ } catch {
+ case ex: NoSuchTableException =>
+ if (!ifExistsSet) {
+ throw ex
+ }
}
+ }
if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) {
val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
find(_._1.getDataMapName.equalsIgnoreCase(dataMapName))
@@ -144,7 +143,7 @@ case class CarbonDropDataMapCommand(
// delete the table folder
val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonEnv.getInstance(sparkSession).storePath)
+ CarbonProperties.getStorePath)
val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 947cea1..2f04feb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -57,21 +57,21 @@ case class AlterTableCompactionCommand(
if (relation == null) {
sys.error(s"Table $databaseName.$tableName does not exist")
}
- if (null == relation.tableMeta.carbonTable) {
+ if (null == relation.carbonTable) {
LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
sys.error(s"alter table failed. table not found: $databaseName.$tableName")
}
val carbonLoadModel = new CarbonLoadModel()
- val table = relation.tableMeta.carbonTable
- carbonLoadModel.setTableName(table.getFactTableName)
+ val table = relation.carbonTable
+ carbonLoadModel.setTableName(table.getTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setTablePath(relation.tableMeta.carbonTable.getTablePath)
+ carbonLoadModel.setTableName(relation.carbonTable.getTableName)
+ carbonLoadModel.setDatabaseName(relation.carbonTable.getDatabaseName)
+ carbonLoadModel.setTablePath(relation.carbonTable.getTablePath)
var storeLocation = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index 2003bb1..32d6b80 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -37,9 +37,7 @@ case class CarbonShowLoadsCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
CarbonStore.showSegments(
GetDB.getDatabaseName(databaseNameOp, sparkSession),
tableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
index 8b0dab7..58e33b7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, Runn
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
case class CleanFilesCommand(
@@ -38,7 +39,7 @@ case class CleanFilesCommand(
if (forceTableClean) {
val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonEnv.getInstance(sparkSession).storePath)
+ CarbonProperties.getStorePath)
// TODO: TAABLEPATH
CarbonStore.cleanFiles(
dbName,
@@ -47,10 +48,7 @@ case class CleanFilesCommand(
null,
forceTableClean)
} else {
- val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val relation = catalog
- .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
val cleanFilesPreEvent: CleanFilesPreEvent =
CleanFilesPreEvent(carbonTable,
sparkSession)
@@ -59,7 +57,7 @@ case class CleanFilesCommand(
CarbonStore.cleanFiles(
GetDB.getDatabaseName(databaseNameOp, sparkSession),
tableName,
- relation.asInstanceOf[CarbonRelation].tableMeta.storePath,
+ CarbonProperties.getStorePath,
carbonTable,
forceTableClean)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
index 6a0465c..5b305ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
@@ -35,9 +35,7 @@ case class DeleteLoadByIdCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
val operationContext = new OperationContext
val deleteSegmentByIdPreEvent: DeleteSegmentByIdPreEvent =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
index 83f41bb..00c35a5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
@@ -37,9 +37,7 @@ case class DeleteLoadByLoadDateCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
val operationContext = new OperationContext
val deleteSegmentByDatePreEvent: DeleteSegmentByDatePreEvent =
DeleteSegmentByDatePreEvent(carbonTable,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
index 3f0e093..845a64c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
@@ -47,7 +47,7 @@ case class LoadTableByInsertCommand(
Some(df)).run(sparkSession)
// updating relation metadata. This is in case of auto detect high cardinality
relation.carbonRelation.metaData =
- CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable)
+ CarbonSparkUtil.createSparkMeta(relation.carbonRelation.carbonTable)
load
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 777c169..0f4ca01 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.statusmanager.SegmentStatus
@@ -54,7 +55,8 @@ case class LoadTableCommand(
isOverwriteTable: Boolean,
var inputSqlString: String = null,
dataFrame: Option[DataFrame] = None,
- updateModel: Option[UpdateTableModel] = None)
+ updateModel: Option[UpdateTableModel] = None,
+ var tableInfoOp: Option[TableInfo] = None)
extends RunnableCommand with DataProcessCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -72,16 +74,6 @@ case class LoadTableCommand(
}
val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- if (relation == null) {
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- if (null == relation.tableMeta.carbonTable) {
- LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
- LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
- sys.error(s"Data loading failed. table not found: $dbName.$tableName")
- }
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
carbonProperty.addProperty("zookeeper.enable.lock", "false")
@@ -105,18 +97,30 @@ case class LoadTableCommand(
// update the property with new value
carbonProperty.addProperty(CarbonCommonConstants.NUM_CORES_LOADING, numCoresLoading)
- val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
-
- val tableProperties = relation.tableMeta.carbonTable.getTableInfo
- .getFactTable.getTableProperties
+ try {
+ val table = if (tableInfoOp.isDefined) {
+ CarbonTable.buildFromTableInfo(tableInfoOp.get)
+ } else {
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ if (relation == null) {
+ sys.error(s"Table $dbName.$tableName does not exist")
+ }
+ if (null == relation.carbonTable) {
+ LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
+ LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
+ sys.error(s"Data loading failed. table not found: $dbName.$tableName")
+ }
+ relation.carbonTable
+ }
- optionsFinal.put("sort_scope", tableProperties.getOrDefault("sort_scope",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+ val tableProperties = table.getTableInfo.getFactTable.getTableProperties
+ val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+ optionsFinal.put("sort_scope", tableProperties.getOrDefault("sort_scope",
+ carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+ carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
- try {
- val table = relation.tableMeta.carbonTable
val carbonLoadModel = new CarbonLoadModel()
val factPath = if (dataFrame.isDefined) {
""
@@ -137,11 +141,9 @@ case class LoadTableCommand(
// First system has to partition the data first and then call the load data
LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
- val storePath = relation.tableMeta.storePath
// add the start entry for the new load in the table status file
if (updateModel.isEmpty) {
- CommonUtil.
- readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteTable)
+ CommonUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
}
if (isOverwriteTable) {
LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
@@ -158,8 +160,7 @@ case class LoadTableCommand(
carbonLoadModel.setUseOnePass(false)
}
// Create table and metadata folders if not exist
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(storePath, table.getCarbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
val fileType = FileFactory.getFileType(metadataDirectoryPath)
if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
@@ -192,9 +193,9 @@ case class LoadTableCommand(
} finally {
// Once the data load is successful delete the unwanted partition files
try {
- val partitionLocation = relation.tableMeta.storePath + "/partition/" +
+ val partitionLocation = CarbonProperties.getStorePath + "/partition/" +
table.getDatabaseName + "/" +
- table.getFactTableName + "/"
+ table.getTableName + "/"
val fileType = FileFactory.getFileType(partitionLocation)
if (FileFactory.isFileExist(partitionLocation, fileType)) {
val file = FileFactory
@@ -234,7 +235,7 @@ case class LoadTableCommand(
.getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier)
val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
val dimensions = carbonTable.getDimensionByTableName(
- carbonTable.getFactTableName).asScala.toArray
+ carbonTable.getTableName).asScala.toArray
val colDictFilePath = carbonLoadModel.getColDictFilePath
if (!StringUtils.isEmpty(colDictFilePath)) {
carbonLoadModel.initPredefDictMap()
@@ -378,8 +379,7 @@ case class LoadTableCommand(
val identifier = model.table.getCarbonTableIdentifier
// update CarbonDataLoadSchema
val carbonTable = metastore.lookupRelation(Option(identifier.getDatabaseName),
- identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
- .carbonTable
+ identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].carbonTable
carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index a52008a..efb6796 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -31,7 +31,6 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -64,24 +63,18 @@ object DeleteExecution {
sparkSession: SparkSession,
dataRdd: RDD[Row],
timestamp: String,
- relation: CarbonRelation,
isUpdateOperation: Boolean,
- executorErrors: ExecutionErrors
- ): Boolean = {
+ executorErrors: ExecutionErrors): Boolean = {
var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] = null
val tableName = getTableIdentifier(identifier).table
val database = GetDB.getDatabaseName(getTableIdentifier(identifier).database, sparkSession)
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
- asInstanceOf[CarbonRelation]
-
- val absoluteTableIdentifier = relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
+ val carbonTable = CarbonEnv.getCarbonTable(Some(database), tableName)(sparkSession)
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val carbonTablePath = CarbonStorePath
.getCarbonTablePath(absoluteTableIdentifier)
val factPath = carbonTablePath.getFactDir
- val carbonTable = relation.tableMeta.carbonTable
var deleteStatus = true
val deleteRdd = if (isUpdateOperation) {
val schema =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 34daf4e..6762489 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -54,7 +54,7 @@ object HorizontalCompaction {
}
var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
- val carbonTable = carbonRelation.tableMeta.carbonTable
+ val carbonTable = carbonRelation.carbonTable
val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val updateTimeStamp = System.currentTimeMillis()
// To make sure that update and delete timestamps are not same,
@@ -116,7 +116,7 @@ object HorizontalCompaction {
factTimeStamp: Long,
segLists: util.List[String]): Unit = {
val db = carbonTable.getDatabaseName
- val table = carbonTable.getFactTableName
+ val table = carbonTable.getTableName
// get the valid segments qualified for update compaction.
val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
absTableIdentifier,
@@ -133,7 +133,7 @@ object HorizontalCompaction {
try {
// Update Compaction.
val alterTableModel = AlterTableModel(Option(carbonTable.getDatabaseName),
- carbonTable.getFactTableName,
+ carbonTable.getTableName,
Some(segmentUpdateStatusManager),
CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
Some(factTimeStamp),
@@ -167,7 +167,7 @@ object HorizontalCompaction {
segLists: util.List[String]): Unit = {
val db = carbonTable.getDatabaseName
- val table = carbonTable.getFactTableName
+ val table = carbonTable.getTableName
val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
absTableIdentifier,
segmentUpdateStatusManager,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
index b18ab78..5817d88 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
@@ -60,7 +60,7 @@ object IUDCommonUtil {
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
.getDatabaseName + "." +
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
- .getFactTableName
+ .getTableName
val sementProperty = carbonProperties
.getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbAndTb, "")
if (!(sementProperty.equals("") || sementProperty.trim.equals("*"))) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
index a898822..cf5bfd8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
@@ -51,7 +51,7 @@ private[sql] case class ProjectForDeleteCommand(
val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
+ val carbonTable = relation.carbonTable
// trigger event for Delete from table
val operationContext = new OperationContext
@@ -77,9 +77,8 @@ private[sql] case class ProjectForDeleteCommand(
// handle the clean up of IUD.
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
- if (DeleteExecution
- .deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp, relation,
- isUpdateOperation = false, executorErrors)) {
+ if (DeleteExecution.deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp,
+ isUpdateOperation = false, executorErrors)) {
// call IUD Compaction.
HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation,
isUpdateOperation = false)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
index 549c58f..da62f27 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -30,7 +30,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
import org.apache.carbondata.processing.loading.FailureCauses
@@ -58,7 +57,7 @@ private[sql] case class ProjectForUpdateCommand(
val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(DeleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
+ val carbonTable = relation.carbonTable
// trigger event for Update table
val operationContext = new OperationContext
@@ -74,7 +73,7 @@ private[sql] case class ProjectForUpdateCommand(
val currentTime = CarbonUpdateUtil.readCurrentTime
// var dataFrame: DataFrame = null
var dataSet: DataFrame = null
- val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset()
+ val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset
try {
lockStatus = metadataLock.lockWithRetries()
if (lockStatus) {
@@ -83,7 +82,6 @@ private[sql] case class ProjectForUpdateCommand(
else {
throw new Exception("Table is locked for updation. Please try after some time")
}
- val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
// Get RDD.
dataSet = if (isPersistEnabled) {
@@ -93,7 +91,7 @@ private[sql] case class ProjectForUpdateCommand(
else {
Dataset.ofRows(sparkSession, plan)
}
- var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+ val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
// handle the clean up of IUD.
@@ -101,8 +99,7 @@ private[sql] case class ProjectForUpdateCommand(
// do delete operation.
DeleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd,
- currentTime + "",
- relation, isUpdateOperation = true, executionErrors)
+ currentTime + "", isUpdateOperation = true, executionErrors)
if(executionErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executionErrors.errorMsg)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
index acd9bd3..5a0e4cc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
@@ -65,8 +65,7 @@ case class AlterTableDropCarbonPartitionCommand(
val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
- val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
- val tablePath = relation.tableMeta.tablePath
+ val tablePath = relation.carbonTable.getTablePath
carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
if (relation == null) {
sys.error(s"Table $dbName.$tableName does not exist")
@@ -75,7 +74,7 @@ case class AlterTableDropCarbonPartitionCommand(
LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
sys.error(s"Alter table failed. table not found: $dbName.$tableName")
}
- val table = relation.tableMeta.carbonTable
+ val table = relation.carbonTable
val partitionInfo = table.getPartitionInfo(tableName)
if (partitionInfo == null) {
sys.error(s"Table $tableName is not a partition table.")
@@ -101,7 +100,7 @@ case class AlterTableDropCarbonPartitionCommand(
sys.error(s"Dropping range interval partition isn't support yet!")
}
partitionInfo.dropPartition(partitionIndex)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
val schemaFilePath = carbonTablePath.getSchemaFilePath
// read TableInfo
val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -142,17 +141,13 @@ case class AlterTableDropCarbonPartitionCommand(
locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
locksToBeAcquired)(sparkSession)
val carbonLoadModel = new CarbonLoadModel()
- val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
- val table = relation.tableMeta.carbonTable
+ val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setTablePath(relation.tableMeta.tablePath)
+ carbonLoadModel.setTableName(table.getTableName)
+ carbonLoadModel.setDatabaseName(table.getDatabaseName)
+ carbonLoadModel.setTablePath(table.getTablePath)
val loadStartTime = CarbonUpdateUtil.readCurrentTime
carbonLoadModel.setFactTimeStamp(loadStartTime)
alterTableDropPartition(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
index 0973226..841da67 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
@@ -69,8 +69,7 @@ case class AlterTableSplitCarbonPartitionCommand(
val tableName = splitPartitionModel.tableName
val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
- val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
- val tablePath = relation.tableMeta.tablePath
+ val tablePath = relation.carbonTable.getTablePath
if (relation == null) {
sys.error(s"Table $dbName.$tableName does not exist")
}
@@ -79,7 +78,7 @@ case class AlterTableSplitCarbonPartitionCommand(
LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
sys.error(s"Alter table failed. table not found: $dbName.$tableName")
}
- val table = relation.tableMeta.carbonTable
+ val table = relation.carbonTable
val partitionInfo = table.getPartitionInfo(tableName)
val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
// keep a copy of partitionIdList before update partitionInfo.
@@ -95,7 +94,7 @@ case class AlterTableSplitCarbonPartitionCommand(
updatePartitionInfo(partitionInfo, partitionIds)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
val schemaFilePath = carbonTablePath.getSchemaFilePath
// read TableInfo
val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -150,16 +149,12 @@ case class AlterTableSplitCarbonPartitionCommand(
locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
locksToBeAcquired)(sparkSession)
val carbonLoadModel = new CarbonLoadModel()
- val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- val tablePath = relation.tableMeta.tablePath
- val table = relation.tableMeta.carbonTable
- val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
+ val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+ val tablePath = table.getTablePath
val dataLoadSchema = new CarbonDataLoadSchema(table)
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
+ carbonLoadModel.setTableName(table.getTableName)
+ carbonLoadModel.setDatabaseName(table.getDatabaseName)
carbonLoadModel.setTablePath(tablePath)
val loadStartTime = CarbonUpdateUtil.readCurrentTime
carbonLoadModel.setFactTimeStamp(loadStartTime)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
index 224304a..903e93b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
@@ -41,10 +41,9 @@ private[sql] case class ShowCarbonPartitionsCommand(
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(tableIdentifier)(sparkSession).
- asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
- val tableName = carbonTable.getFactTableName
+ .lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val carbonTable = relation.carbonTable
+ val tableName = carbonTable.getTableName
val partitionInfo = carbonTable.getPartitionInfo(
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
if (partitionInfo == null) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index dd002f0..3854f76 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -73,7 +73,7 @@ case class CreatePreAggregateTableCommand(
// getting the parent table
val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
// getting the table name
- val parentTableName = parentTable.getFactTableName
+ val parentTableName = parentTable.getTableName
// getting the db name of parent table
val parentDbName = parentTable.getDatabaseName
@@ -85,9 +85,8 @@ case class CreatePreAggregateTableCommand(
tableModel.dataMapRelation = Some(fieldRelationMap)
CarbonCreateTableCommand(tableModel).run(sparkSession)
try {
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation( tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
- val tableInfo = relation.tableMeta.carbonTable.getTableInfo
+ val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
+ val tableInfo = table.getTableInfo
// child schema object which will be updated on parent table about the
val childSchema = tableInfo.getFactTable
.buildChildSchema(dataMapName, CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 506a405..f64deec 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -84,7 +84,7 @@ object PreAggregateDataTypeChangePreListener extends OperationEventListener {
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
s"Cannot change data type for columns in pre-aggregate table ${ carbonTable.getDatabaseName
- }.${ carbonTable.getFactTableName }")
+ }.${ carbonTable.getTableName }")
}
}
}
@@ -102,7 +102,7 @@ object PreAggregateAddColumnsPreListener extends OperationEventListener {
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
s"Cannot add columns in pre-aggreagate table ${ carbonTable.getDatabaseName
- }.${ carbonTable.getFactTableName }")
+ }.${ carbonTable.getTableName }")
}
}
}
@@ -185,7 +185,7 @@ object PreAggregateDropColumnPreListener extends OperationEventListener {
}
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(s"Cannot drop columns in pre-aggreagate table ${
- carbonTable.getDatabaseName}.${ carbonTable.getFactTableName }")
+ carbonTable.getDatabaseName}.${ carbonTable.getTableName }")
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 3193310..1647f9e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -326,21 +326,19 @@ object PreAggregateUtil {
var numberOfCurrentChild: Int = 0
try {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
+ carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
// get the latest carbon table and check for column existence
// read the latest schema file
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(
+ carbonTable.getAbsoluteTableIdentifier)
val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(thriftTableInfo,
- dbName,
- tableName,
- carbonTable.getTablePath)
+ val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+ thriftTableInfo,
+ dbName,
+ tableName,
+ carbonTable.getTablePath)
numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
if (wrapperTableInfo.getDataMapSchemaList.asScala.
exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) {
@@ -374,7 +372,7 @@ object PreAggregateUtil {
def updateSchemaInfo(carbonTable: CarbonTable,
thriftTable: TableInfo)(sparkSession: SparkSession): Unit = {
val dbName = carbonTable.getDatabaseName
- val tableName = carbonTable.getFactTableName
+ val tableName = carbonTable.getTableName
CarbonEnv.getInstance(sparkSession).carbonMetastore
.updateTableSchemaForDataMap(carbonTable.getCarbonTableIdentifier,
carbonTable.getCarbonTableIdentifier,
@@ -435,31 +433,30 @@ object PreAggregateUtil {
def revertMainTableChanges(dbName: String, tableName: String, numberOfChildSchema: Int)
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
- .carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
carbonTable.getTableLastUpdatedTime
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
- metastore
- .revertTableSchemaForPreAggCreationFailure(carbonTable.getAbsoluteTableIdentifier,
- thriftTable)(sparkSession)
+ metastore.revertTableSchemaForPreAggCreationFailure(
+ carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession)
}
}
def getChildCarbonTable(databaseName: String, tableName: String)
(sparkSession: SparkSession): Option[CarbonTable] = {
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- metaStore.getTableFromMetadataCache(databaseName, tableName) match {
- case Some(tableMeta) => Some(tableMeta.carbonTable)
- case None => try {
+ val carbonTable = metaStore.getTableFromMetadataCache(databaseName, tableName)
+ if (carbonTable.isEmpty) {
+ try {
Some(metaStore.lookupRelation(Some(databaseName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation].metaData.carbonTable)
} catch {
case _: Exception =>
None
}
+ } else {
+ carbonTable
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 2132131..3b39334 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -57,9 +57,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
// older carbon table and this can lead to inconsistent state in the system. Therefor look
// up relation should be called after acquiring the lock
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
+ carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val alterTableAddColumnListener = AlterTableAddColumnPreEvent(carbonTable,
alterTableAddColumnsModel)
OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index e44899e..c24a8e9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -51,9 +51,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
+ carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val alterTableDataTypeChangeListener = AlterTableDataTypeChangePreEvent(carbonTable,
alterTableDataTypeChangeModel)
OperationListenerBus.getInstance().fireEvent(alterTableDataTypeChangeListener)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index dae2d7b..721dd0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -53,9 +53,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
+ carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val partitionInfo = carbonTable.getPartitionInfo(tableName)
if (partitionInfo != null) {
val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index f1cce13..e7beedd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -81,9 +81,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
locks = AlterTableUtil
.validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
sparkSession)
- val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
- .asInstanceOf[CarbonRelation].tableMeta
- carbonTable = tableMeta.carbonTable
+ carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+ .asInstanceOf[CarbonRelation].carbonTable
// invalid data map for the old table, see CARBON-1690
val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
@@ -134,7 +133,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
carbonTable.getCarbonTableIdentifier,
tableInfo,
schemaEvolutionEntry,
- tableMeta.tablePath)(sparkSession)
+ carbonTable.getTablePath)(sparkSession)
val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
carbonTable,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index c126b25..a060833 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -357,11 +357,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
private def getPartitioning(carbonTable: CarbonTable,
output: Seq[Attribute]): Partitioning = {
- val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
+ val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getTableName)
if (info != null) {
val cols = info.getListOfColumns.asScala
val sortColumn = carbonTable.
- getDimensionByTableName(carbonTable.getFactTableName).get(0).getColName
+ getDimensionByTableName(carbonTable.getTableName).get(0).getColName
val numBuckets = info.getNumberOfBuckets
val bucketColumns = cols.flatMap { n =>
val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index ef2e0a5..d6450c1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -51,8 +51,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
val dbOption = oldTableIdentifier.database.map(_.toLowerCase)
val tableIdentifier = TableIdentifier(oldTableIdentifier.table.toLowerCase(), dbOption)
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .tableExists(tableIdentifier)(
- sparkSession)
+ .tableExists(tableIdentifier)(sparkSession)
if (isCarbonTable) {
val renameModel = AlterTableRenameModel(tableIdentifier, newTableIdentifier)
ExecutedCommandExec(CarbonAlterTableRenameCommand(renameModel)) :: Nil
@@ -155,13 +154,13 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil
case AlterTableSetPropertiesCommand(tableName, properties, isView)
- if (CarbonEnv.getInstance(sparkSession).carbonMetastore
- .tableExists(tableName)(sparkSession)) => {
+ if CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(tableName)(sparkSession) => {
ExecutedCommandExec(AlterTableSetCommand(tableName, properties, isView)) :: Nil
}
case AlterTableUnsetPropertiesCommand(tableName, propKeys, ifExists, isView)
- if (CarbonEnv.getInstance(sparkSession).carbonMetastore
- .tableExists(tableName)(sparkSession)) => {
+ if CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(tableName)(sparkSession) => {
ExecutedCommandExec(AlterTableUnsetCommand(tableName, propKeys, ifExists, isView)) :: Nil
}
case _ => Nil
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 9ebf47e..49a57e6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.execution.command.{AlterTableRenameCommand, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.AlterTableRenameCommand
import org.apache.spark.sql.execution.command.mutation.{DeleteExecution, ProjectForDeleteCommand, ProjectForUpdateCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
import org.apache.spark.sql.hive.CarbonRelation
@@ -76,7 +76,6 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp
val streaming = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(tableIdentifier)(sparkSession)
.asInstanceOf[CarbonRelation]
- .tableMeta
.carbonTable
.isStreamingTable
if (streaming) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 6d80a26..87c919d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
-import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession}
@@ -44,13 +43,12 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.util.CarbonSparkUtil
-case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) {
+case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
// clear the metadata
def clear(): Unit = {
- tablesMeta.clear()
+ carbonTables.clear()
}
}
@@ -80,7 +78,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
System.nanoTime() + ""
}
- val metadata = MetaData(new ArrayBuffer[TableMeta]())
+ val metadata = MetaData(new ArrayBuffer[CarbonTable]())
/**
@@ -98,13 +96,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
val tables = getTableFromMetadataCache(database, tableName)
tables match {
case Some(t) =>
- CarbonRelation(database, tableName,
- CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
+ CarbonRelation(database, tableName, CarbonSparkUtil.createSparkMeta(t), t)
case None =>
readCarbonSchema(absIdentifier) match {
case Some(meta) =>
CarbonRelation(database, tableName,
- CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
+ CarbonSparkUtil.createSparkMeta(meta), meta)
case None =>
throw new NoSuchTableException(database, tableName)
}
@@ -151,7 +148,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
val operationContext = new OperationContext
val lookupRelationPostEvent: LookupRelationPostEvent =
LookupRelationPostEvent(
- relation.tableMeta.carbonTable,
+ relation.carbonTable,
sparkSession)
OperationListenerBus.getInstance.fireEvent(lookupRelationPostEvent, operationContext)
relation
@@ -164,10 +161,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
* @param tableName
* @return
*/
- def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = {
- metadata.tablesMeta
- .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+ def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] = {
+ metadata.carbonTables
+ .find(table => table.getDatabaseName.equalsIgnoreCase(database) &&
+ table.getTableName.equalsIgnoreCase(tableName))
}
def tableExists(
@@ -187,7 +184,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
true
}
- private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
+ private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[CarbonTable] = {
val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
val tableName = identifier.getCarbonTableIdentifier.getTableName
val tablePath = identifier.getTablePath
@@ -210,12 +207,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
- val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
- identifier.getTablePath,
- identifier.getTablePath,
- carbonTable)
- metadata.tablesMeta += tableMeta
- Some(tableMeta)
+ metadata.carbonTables += carbonTable
+ Some(carbonTable)
} else {
None
}
@@ -378,10 +371,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getTablePath,
- absoluteTableIdentifier.getTablePath,
- CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
- metadata.tablesMeta += tableMeta
+ metadata.carbonTables +=
+ CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)
}
/**
@@ -391,10 +382,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
* @param tableName
*/
def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
- val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
- metadataToBeRemoved match {
- case Some(tableMeta) =>
- metadata.tablesMeta -= tableMeta
+ val carbonTableToBeRemoved: Option[CarbonTable] = getTableFromMetadataCache(dbName, tableName)
+ carbonTableToBeRemoved match {
+ case Some(carbonTable) =>
+ metadata.carbonTables -= carbonTable
CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
case None =>
if (LOGGER.isDebugEnabled) {
@@ -409,10 +400,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
wrapperTableInfo.getTableUniqueName)
- for (i <- metadata.tablesMeta.indices) {
+ for (i <- metadata.carbonTables.indices) {
if (wrapperTableInfo.getTableUniqueName.equals(
- metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
- metadata.tablesMeta(i).carbonTable = carbonTable
+ metadata.carbonTables(i).getTableUniqueName)) {
+ metadata.carbonTables(i) = carbonTable
}
}
}
@@ -434,8 +425,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
try {
- val tablePath = lookupRelation(tableIdentifier)(sparkSession).
- asInstanceOf[CarbonRelation].tableMeta.tablePath
+ val tablePath = lookupRelation(tableIdentifier)(sparkSession)
+ .asInstanceOf[CarbonRelation].carbonTable.getTablePath
val fileType = FileFactory.getFileType(tablePath)
FileFactory.isFileExist(tablePath, fileType)
} catch {
@@ -531,13 +522,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
}
private def refreshCache() {
- metadata.tablesMeta.clear()
+ metadata.carbonTables.clear()
}
override def isReadFromHiveMetaStore: Boolean = false
override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
- metadata.tablesMeta.map(_.carbonTable)
+ metadata.carbonTables
override def getThriftTableInfo(tablePath: CarbonTablePath)
(sparkSession: SparkSession): TableInfo = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index dedaf1c..4d4229a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -21,20 +21,17 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.format
import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.processing.merger.TableMeta
-import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil}
+import org.apache.carbondata.spark.util.CarbonSparkUtil
/**
* Metastore to store carbonschema in hive
@@ -56,10 +53,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
val info = CarbonUtil.convertGsonToTableInfo(parameters.asJava)
if (info != null) {
val table = CarbonTable.buildFromTableInfo(info)
- val meta = new TableMeta(table.getCarbonTableIdentifier,
- absIdentifier.getTablePath, absIdentifier.getTablePath, table)
CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName,
- CarbonSparkUtil.createSparkMeta(table), meta)
+ CarbonSparkUtil.createSparkMeta(table), table)
} else {
super.createCarbonRelation(parameters, absIdentifier, sparkSession)
}
@@ -107,7 +102,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo,
carbonTable.getDatabaseName,
- carbonTable.getFactTableName)
+ carbonTable.getTableName)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 357a812..696342f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.processing.merger.TableMeta
+
/**
* Interface for Carbonmetastore
@@ -140,7 +140,7 @@ trait CarbonMetaStore {
def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo
- def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta]
+ def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 3fb0db0..c48e6e8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -761,7 +761,7 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
.DEFAULT_MAX_NUMBER_OF_COLUMNS
)
}
- val isAggregateTable = !relation.carbonRelation.tableMeta.carbonTable.getTableInfo
+ val isAggregateTable = !relation.carbonRelation.carbonTable.getTableInfo
.getParentRelationIdentifiers.isEmpty
// transform logical plan if the load is for aggregate table.
val childPlan = if (isAggregateTable) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 2c476ed..9187fe2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -29,10 +29,10 @@ import org.apache.spark.sql.types._
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.merger.TableMeta
/**
* Represents logical plan for one carbon table
@@ -41,7 +41,7 @@ case class CarbonRelation(
databaseName: String,
tableName: String,
var metaData: CarbonMetaData,
- tableMeta: TableMeta)
+ carbonTable: CarbonTable)
extends LeafNode with MultiInstanceRelation {
def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
@@ -84,17 +84,17 @@ case class CarbonRelation(
}
override def newInstance(): LogicalPlan = {
- CarbonRelation(databaseName, tableName, metaData, tableMeta)
+ CarbonRelation(databaseName, tableName, metaData, carbonTable)
.asInstanceOf[this.type]
}
- val dimensionsAttr = {
+ val dimensionsAttr: Seq[AttributeReference] = {
val sett = new LinkedHashSet(
- tableMeta.carbonTable.getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName)
+ carbonTable.getDimensionByTableName(carbonTable.getTableName)
.asScala.asJava)
sett.asScala.toSeq.map(dim => {
val dimval = metaData.carbonTable
- .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
+ .getDimensionByName(metaData.carbonTable.getTableName, dim.getColName)
val output: DataType = dimval.getDataType.getName.toLowerCase match {
case "array" =>
CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
@@ -113,11 +113,10 @@ case class CarbonRelation(
}
val measureAttr = {
- val factTable = tableMeta.carbonTable.getFactTableName
+ val factTable = carbonTable.getTableName
new LinkedHashSet(
- tableMeta.carbonTable.
- getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
- asScala.asJava).asScala.toSeq.map { x =>
+ carbonTable.getMeasureByTableName(carbonTable.getTableName).asScala.asJava).asScala.toSeq
+ .map { x =>
val metastoreType = metaData.carbonTable.getMeasureByName(factTable, x.getColName)
.getDataType.getName.toLowerCase match {
case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
@@ -131,7 +130,7 @@ case class CarbonRelation(
}
override val output = {
- val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName)
+ val columns = carbonTable.getCreateOrderColumn(carbonTable.getTableName)
.asScala
// convert each column to Attribute
columns.filter(!_.isInvisible).map { column =>
@@ -196,12 +195,11 @@ case class CarbonRelation(
def sizeInBytes: Long = {
val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
- tableMeta.carbonTable.getAbsoluteTableIdentifier)
+ carbonTable.getAbsoluteTableIdentifier)
if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
val tablePath = CarbonStorePath.getCarbonTablePath(
- tableMeta.storePath,
- tableMeta.carbonTableIdentifier).getPath
+ carbonTable.getAbsoluteTableIdentifier).getPath
val fileType = FileFactory.getFileType(tablePath)
if(FileFactory.isFileExist(tablePath, fileType)) {
tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index e587395..b0aecd7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParser
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events._
/**
@@ -106,15 +107,15 @@ class CarbonSessionCatalog(
alias: Option[String],
carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
var isRefreshed = false
- val storePath = CarbonEnv.getInstance(sparkSession).storePath
+ val storePath = CarbonProperties.getStorePath
carbonEnv.carbonMetastore.
checkSchemasModifiedTimeAndReloadTables()
- val tableMeta = carbonEnv.carbonMetastore
- .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
- carbonDatasourceHadoopRelation.carbonTable.getFactTableName)
- if (tableMeta.isEmpty || (tableMeta.isDefined &&
- tableMeta.get.carbonTable.getTableLastUpdatedTime !=
+ val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
+ carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
+ carbonDatasourceHadoopRelation.carbonTable.getTableName)
+ if (table.isEmpty || (table.isDefined &&
+ table.get.getTableLastUpdatedTime !=
carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
refreshTable(identifier)
DataMapStoreManager.getInstance().
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index b76b24e..9cc5d86 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -42,11 +42,11 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
var databaseLocation = ""
try {
databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonEnv.getInstance(sparkSession).storePath)
+ CarbonProperties.getStorePath)
} catch {
case e: NoSuchDatabaseException =>
// ignore the exception as exception will be handled by hive command.run
- databaseLocation = CarbonEnv.getInstance(sparkSession).storePath
+ databaseLocation = CarbonProperties.getStorePath
}
// DropHiveDB command will fail if cascade is false and one or more table exists in database
if (command.cascade && tablesInDB != null) {