You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/04 06:36:12 UTC
[2/3] carbondata git commit: [CARBONDATA-1844] Add tablePath support
when creating table
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
index ab77fba..b305fa9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
@@ -27,7 +27,8 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
* @param ifExistsSet
* @param sparkSession
*/
-case class DropTablePreEvent(carbonTable: Option[CarbonTable],
+case class DropTablePreEvent(
+ carbonTable: CarbonTable,
ifExistsSet: Boolean,
sparkSession: SparkSession)
extends Event with DropTableEventInfo
@@ -39,7 +40,8 @@ case class DropTablePreEvent(carbonTable: Option[CarbonTable],
* @param ifExistsSet
* @param sparkSession
*/
-case class DropTablePostEvent(carbonTable: Option[CarbonTable],
+case class DropTablePostEvent(
+ carbonTable: CarbonTable,
ifExistsSet: Boolean,
sparkSession: SparkSession)
extends Event with DropTableEventInfo
@@ -51,7 +53,8 @@ case class DropTablePostEvent(carbonTable: Option[CarbonTable],
* @param ifExistsSet
* @param sparkSession
*/
-case class DropTableAbortEvent(carbonTable: Option[CarbonTable],
+case class DropTableAbortEvent(
+ carbonTable: CarbonTable,
ifExistsSet: Boolean,
sparkSession: SparkSession)
extends Event with DropTableEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 9c542b6..6279fca 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.events
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -35,7 +35,7 @@ trait DatabaseEventInfo {
* event for table related operations
*/
trait TableEventInfo {
- val carbonTableIdentifier: CarbonTableIdentifier
+ val identifier: AbsoluteTableIdentifier
}
/**
@@ -57,7 +57,7 @@ trait LookupRelationEventInfo {
* event for drop table
*/
trait DropTableEventInfo {
- val carbonTable: Option[CarbonTable]
+ val carbonTable: CarbonTable
val ifExistsSet: Boolean
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 4500221..594ea0e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -17,21 +17,17 @@
package org.apache.carbondata.spark
-import org.apache.carbondata.core.constants.CarbonCommonConstants
/**
* Contains all options for Spark data source
*/
class CarbonOption(options: Map[String, String]) {
- def tableIdentifier: String = options.getOrElse("tableName", s"$dbName.$tableName")
- def dbName: String = options.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+ def dbName: Option[String] = options.get("dbName")
def tableName: String = options.getOrElse("tableName", "default_table")
- def tablePath: String = s"$dbName/$tableName"
-
- def tableId: String = options.getOrElse("tableId", "default_table_id")
+ def tablePath: Option[String] = options.get("tablePath")
def partitionCount: String = options.getOrElse("partitionCount", "1")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 3f47a47..44fc7ad 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -505,7 +505,6 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
* @param sparkContext spark context
* @param table carbon table identifier
* @param dimensions carbon dimenisons having predefined dict
- * @param hdfsLocation carbon base store path
* @param dictFolderPath path of dictionary folder
*/
class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
@@ -513,7 +512,6 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
sparkContext: SparkContext,
table: CarbonTableIdentifier,
dimensions: Array[CarbonDimension],
- hdfsLocation: String,
dictFolderPath: String)
extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 50005c8..2f190b5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -53,8 +53,8 @@ class CarbonIUDMergerRDD[K, V](
override def getPartitions: Array[Partition] = {
val startTime = System.currentTimeMillis()
- val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
- hdfsStoreLocation, new CarbonTableIdentifier(databaseName, factTableName, tableId)
+ val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
+ tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
)
val jobConf: JobConf = new JobConf(new Configuration)
val job: Job = new Job(jobConf)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 997838c..82b2a57 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -67,7 +67,7 @@ class CarbonMergerRDD[K, V](
private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
var storeLocation: String = null
var mergeResult: String = null
- val hdfsStoreLocation = carbonMergerMapping.hdfsStoreLocation
+ val tablePath = carbonMergerMapping.hdfsStoreLocation
val metadataFilePath = carbonMergerMapping.metadataFilePath
val mergedLoadName = carbonMergerMapping.mergedLoadName
val databaseName = carbonMergerMapping.databaseName
@@ -167,7 +167,7 @@ class CarbonMergerRDD[K, V](
val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
- carbonLoadModel.setTablePath(hdfsStoreLocation)
+ carbonLoadModel.setTablePath(tablePath)
// check for restructured block
// TODO: only in case of add and drop this variable should be true
val restructuredBlockExists: Boolean = CarbonCompactionUtil
@@ -260,8 +260,8 @@ class CarbonMergerRDD[K, V](
override def getPartitions: Array[Partition] = {
val startTime = System.currentTimeMillis()
- val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
- hdfsStoreLocation, new CarbonTableIdentifier(databaseName, factTableName, tableId)
+ val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
+ tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
)
val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
absoluteTableIdentifier)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 11e5baf..7316574 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -330,9 +330,7 @@ class CarbonScanRDD(
private def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
CarbonTableInputFormat.setTableInfo(conf, tableInfo)
CarbonTableInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
- CarbonTableInputFormat
- .setTableName(conf,
- tableInfo.getOrCreateAbsoluteTableIdentifier().getCarbonTableIdentifier.getTableName)
+ CarbonTableInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
createInputFormat(conf)
}
@@ -341,9 +339,7 @@ class CarbonScanRDD(
val tableInfo1 = getTableInfo
CarbonTableInputFormat.setTableInfo(conf, tableInfo1)
CarbonTableInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
- CarbonTableInputFormat
- .setTableName(conf,
- tableInfo1.getOrCreateAbsoluteTableIdentifier().getCarbonTableIdentifier.getTableName)
+ CarbonTableInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
CarbonTableInputFormat.setDataTypeConverter(conf, new SparkDataTypeConverterImpl)
createInputFormat(conf)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
index e304d84..08d635b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable
import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnIdentifier}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.service.CarbonCommonFactory
import org.apache.carbondata.core.util.DataTypeUtil
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index af1d3f8..3595884 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
import org.apache.carbondata.core.metadata.schema.PartitionInfo
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
@@ -529,7 +529,7 @@ object CommonUtil {
CarbonLoaderUtil.populateNewLoadMetaEntry(
newLoadMetaEntry, status, model.getFactTimeStamp, false)
val entryAdded: Boolean =
- CarbonLoaderUtil.recordLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite)
+ CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite)
if (!entryAdded) {
sys.error(s"Failed to add entry in table status for " +
s"${ model.getDatabaseName }.${model.getTableName}")
@@ -550,7 +550,7 @@ object CommonUtil {
val loadMetaEntry = model.getLoadMetadataDetails.get(model.getLoadMetadataDetails.size - 1)
CarbonLoaderUtil
.populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp, true)
- val updationStatus = CarbonLoaderUtil.recordLoadMetadata(loadMetaEntry, model, false, false)
+ val updationStatus = CarbonLoaderUtil.recordNewLoadMetadata(loadMetaEntry, model, false, false)
if (!updationStatus) {
sys
.error(s"Failed to update failure entry in table status for ${
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index f6170e8..ccbc9f5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -292,14 +292,13 @@ object GlobalDictionaryUtil {
* @param carbonLoadModel carbon load model
* @param table CarbonTableIdentifier
* @param dimensions column list
- * @param hdfsLocation store location in HDFS
- * @param dictfolderPath path of dictionary folder
+ * @param dictFolderPath path of dictionary folder
*/
- def createDictionaryLoadModel(carbonLoadModel: CarbonLoadModel,
+ def createDictionaryLoadModel(
+ carbonLoadModel: CarbonLoadModel,
table: CarbonTableIdentifier,
dimensions: Array[CarbonDimension],
- hdfsLocation: String,
- dictfolderPath: String,
+ dictFolderPath: String,
forPreDefDict: Boolean): DictionaryLoadModel = {
val primDimensionsBuffer = new ArrayBuffer[CarbonDimension]
val isComplexes = new ArrayBuffer[Boolean]
@@ -312,7 +311,7 @@ object GlobalDictionaryUtil {
}
val primDimensions = primDimensionsBuffer.map { x => x }.toArray
val dictDetail = CarbonSparkFactory.getDictionaryDetailService.
- getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
+ getDictionaryDetail(dictFolderPath, primDimensions, table, carbonLoadModel.getTablePath)
val dictFilePaths = dictDetail.dictFilePaths
val dictFileExists = dictDetail.dictFileExists
val columnIdentifier = dictDetail.columnIdentifiers
@@ -327,11 +326,12 @@ object GlobalDictionaryUtil {
if (null == carbonLoadModel.getLoadMetadataDetails) {
CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
- val absoluteTableIdentifier = new AbsoluteTableIdentifier(hdfsLocation, table)
- DictionaryLoadModel(absoluteTableIdentifier,
+ val absoluteTableIdentifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, table)
+ DictionaryLoadModel(
+ absoluteTableIdentifier,
dimensions,
- hdfsLocation,
- dictfolderPath,
+ carbonLoadModel.getTablePath,
+ dictFolderPath,
dictFilePaths,
dictFileExists,
isComplexes.toArray,
@@ -505,7 +505,6 @@ object GlobalDictionaryUtil {
* @param dimensions dimension column
* @param carbonLoadModel carbon load model
* @param sqlContext spark sql context
- * @param hdfsLocation store location on hdfs
* @param dictFolderPath generated global dict file path
*/
def generatePredefinedColDictionary(colDictFilePath: String,
@@ -513,15 +512,14 @@ object GlobalDictionaryUtil {
dimensions: Array[CarbonDimension],
carbonLoadModel: CarbonLoadModel,
sqlContext: SQLContext,
- hdfsLocation: String,
dictFolderPath: String): Unit = {
// set pre defined dictionary column
setPredefinedColumnDictPath(carbonLoadModel, colDictFilePath, table, dimensions)
val dictLoadModel = createDictionaryLoadModel(carbonLoadModel, table, dimensions,
- hdfsLocation, dictFolderPath, forPreDefDict = true)
+ dictFolderPath, forPreDefDict = true)
// new RDD to achieve distributed column dict generation
val extInputRDD = new CarbonColumnDictGenerateRDD(carbonLoadModel, dictLoadModel,
- sqlContext.sparkContext, table, dimensions, hdfsLocation, dictFolderPath)
+ sqlContext.sparkContext, table, dimensions, dictFolderPath)
.partitionBy(new ColumnPartitioner(dictLoadModel.primDimensions.length))
val statusList = new CarbonGlobalDictionaryGenerateRDD(extInputRDD, dictLoadModel).collect()
// check result status
@@ -681,15 +679,16 @@ object GlobalDictionaryUtil {
* @param sqlContext sql context
* @param carbonLoadModel carbon load model
*/
- def generateGlobalDictionary(sqlContext: SQLContext,
+ def generateGlobalDictionary(
+ sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
- tablePath: String,
hadoopConf: Configuration,
dataFrame: Option[DataFrame] = None): Unit = {
try {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
// create dictionary folder if not exists
+ val tablePath = carbonLoadModel.getTablePath
val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
val dictfolderPath = carbonTablePath.getMetadataDirectoryPath
// columns which need to generate global dictionary file
@@ -709,7 +708,7 @@ object GlobalDictionaryUtil {
if (colDictFilePath != null) {
// generate predefined dictionary
generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
- dimensions, carbonLoadModel, sqlContext, tablePath, dictfolderPath)
+ dimensions, carbonLoadModel, sqlContext, dictfolderPath)
}
if (headers.length > df.columns.length) {
val msg = "The number of columns in the file header do not match the " +
@@ -725,7 +724,7 @@ object GlobalDictionaryUtil {
// select column to push down pruning
df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
- requireDimension, tablePath, dictfolderPath, false)
+ requireDimension, dictfolderPath, false)
// combine distinct value in a block and partition by column
val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
.partitionBy(new ColumnPartitioner(model.primDimensions.length))
@@ -737,9 +736,9 @@ object GlobalDictionaryUtil {
LOGGER.info("No column found for generating global dictionary in source data files")
}
} else {
- generateDictionaryFromDictionaryFiles(sqlContext,
+ generateDictionaryFromDictionaryFiles(
+ sqlContext,
carbonLoadModel,
- tablePath,
carbonTableIdentifier,
dictfolderPath,
dimensions,
@@ -764,11 +763,11 @@ object GlobalDictionaryUtil {
}
}
- def generateDictionaryFromDictionaryFiles(sqlContext: SQLContext,
+ def generateDictionaryFromDictionaryFiles(
+ sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
- storePath: String,
carbonTableIdentifier: CarbonTableIdentifier,
- dictfolderPath: String,
+ dictFolderPath: String,
dimensions: Array[CarbonDimension],
allDictionaryPath: String): Unit = {
LOGGER.info("Generate global dictionary from dictionary files!")
@@ -781,7 +780,7 @@ object GlobalDictionaryUtil {
val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, headers, headers)
if (requireDimension.nonEmpty) {
val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
- requireDimension, storePath, dictfolderPath, false)
+ requireDimension, dictFolderPath, false)
// check if dictionary files contains bad record
val accumulator = sqlContext.sparkContext.accumulator(0)
// read local dictionary file, and group by key
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 8048b66..4ad939c 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -245,7 +245,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
tableName: String,
fields: Seq[Field],
partitionCols: Seq[PartitionerField],
- tableProperties: mutable.Map[String, String],
+ tableProperties: Map[String, String],
bucketFields: Option[BucketFields],
isAlterFlow: Boolean = false,
tableComment: Option[String] = None): TableModel = {
@@ -276,10 +276,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
TableModel(
ifNotExistPresent,
- dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
dbName,
tableName,
- tableProperties,
+ tableProperties.toMap,
reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
msrs.map(f => normalizeType(f)),
Option(sortKeyDims),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 844f6f7..44f577d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -21,7 +21,6 @@ import java.util
import java.util.UUID
import scala.collection.JavaConverters._
-import scala.collection.mutable.Map
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
@@ -30,7 +29,7 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema._
@@ -48,7 +47,6 @@ import org.apache.carbondata.spark.util.DataTypeConverterUtil
case class TableModel(
ifNotExistsSet: Boolean,
- var databaseName: String,
databaseNameOp: Option[String],
tableName: String,
tableProperties: Map[String, String],
@@ -58,8 +56,7 @@ case class TableModel(
highcardinalitydims: Option[Seq[String]],
noInvertedIdxCols: Option[Seq[String]],
columnGroups: Seq[String],
- colProps: Option[util.Map[String,
- util.List[ColumnProperty]]] = None,
+ colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None,
bucketFields: Option[BucketFields],
partitionInfo: Option[PartitionInfo],
tableComment: Option[String] = None,
@@ -322,8 +319,14 @@ class AlterTableColumnSchemaGenerator(
// TODO: move this to carbon store API
object TableNewProcessor {
- def apply(cm: TableModel): TableInfo = {
- new TableNewProcessor(cm).process
+ def apply(
+ cm: TableModel,
+ identifier: AbsoluteTableIdentifier): TableInfo = {
+ new TableNewProcessor(
+ cm,
+ identifier.getDatabaseName,
+ identifier.getTableName,
+ identifier.getTablePath).process
}
def createColumnSchema(
@@ -356,7 +359,7 @@ object TableNewProcessor {
}
columnSchema.setEncodingList(encoders)
val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
- val columnUniqueId = colUniqueIdGenerator.generateUniqueId(databaseName, columnSchema)
+ val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema)
columnSchema.setColumnUniqueId(columnUniqueId)
columnSchema.setColumnReferenceId(columnUniqueId)
columnSchema.setColumnar(true)
@@ -370,7 +373,7 @@ object TableNewProcessor {
}
}
-class TableNewProcessor(cm: TableModel) {
+class TableNewProcessor(cm: TableModel, dbName: String, tableName: String, tablePath: String) {
def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
@@ -420,8 +423,7 @@ class TableNewProcessor(cm: TableModel) {
}
columnSchema.setEncodingList(encoders)
val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
- val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.databaseName,
- columnSchema)
+ val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema)
columnSchema.setColumnUniqueId(columnUniqueId)
columnSchema.setColumnReferenceId(columnUniqueId)
columnSchema.setDimensionColumn(isDimensionCol)
@@ -529,7 +531,6 @@ class TableNewProcessor(cm: TableModel) {
LOGGER.error(s"Duplicate column found with name: $name")
LOGGER.audit(
s"Validation failed for Create/Alter Table Operation " +
- s"for ${ cm.databaseName }.${ cm.tableName }" +
s"Duplicate column found with name: $name")
CarbonException.analysisException(s"Duplicate dimensions found with name: $name")
}
@@ -621,11 +622,12 @@ class TableNewProcessor(cm: TableModel) {
partitionInfo.setColumnSchemaList(partitionCols)
tableSchema.setPartitionInfo(partitionInfo)
}
- tableSchema.setTableName(cm.tableName)
+ tableSchema.setTableName(tableName)
tableSchema.setListOfColumns(allColumns.asJava)
tableSchema.setSchemaEvalution(schemaEvol)
- tableInfo.setDatabaseName(cm.databaseName)
- tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(cm.databaseName, cm.tableName))
+ tableInfo.setTablePath(tablePath)
+ tableInfo.setDatabaseName(dbName)
+ tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName))
tableInfo.setLastUpdatedTime(System.currentTimeMillis())
tableInfo.setFactTable(tableSchema)
tableInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 76b9e30..6864495 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -54,7 +54,8 @@ object CarbonReflectionUtils {
.map(l => im.reflectField(l.asTerm).get).getOrElse(null)
}
- def getUnresolvedRelation(tableIdentifier: TableIdentifier,
+ def getUnresolvedRelation(
+ tableIdentifier: TableIdentifier,
version: String,
tableAlias: Option[String] = None): UnresolvedRelation = {
val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation"
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 145068f..f1b9ecd 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -286,21 +286,17 @@ object CarbonDataRDDFactory {
hadoopConf: Configuration,
dataFrame: Option[DataFrame] = None,
updateModel: Option[UpdateTableModel] = None): Unit = {
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val operationContext = new OperationContext
- // for handling of the segment Merging.
-
LOGGER.audit(s"Data load request has been received for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
// Check if any load need to be deleted before loading new data
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
DataManagementFunc.deleteLoadsAndUpdateMetadata(isForceDeletion = false, carbonTable)
var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
// create new segment folder in carbon store
if (updateModel.isEmpty) {
- CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
- carbonLoadModel.getSegmentId, carbonTable)
+ CarbonLoaderUtil.checkAndCreateCarbonDataLocation(carbonLoadModel.getSegmentId, carbonTable)
}
var loadStatus = SegmentStatus.SUCCESS
var errorMessage: String = "DataLoad failure"
@@ -769,7 +765,7 @@ object CarbonDataRDDFactory {
true)
CarbonUtil
.addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable)
- val done = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, carbonLoadModel, false,
+ val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,
overwriteTable)
if (!done) {
val errorMessage = "Dataload failed due to failure in table status updation."
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 4f2cf14..d433470 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -70,48 +70,6 @@ case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attr
case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
-object GetDB {
-
- def getDatabaseName(dbName: Option[String], sparkSession: SparkSession): String = {
- dbName.getOrElse(
- sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCurrentDatabase)
- }
-
- /**
- * The method returns the database location
- * if carbon.storeLocation does point to spark.sql.warehouse.dir then returns
- * the database locationUri as database location else follows the old behaviour
- * making database location from carbon fixed store and database name.
- *
- * @param dbName
- * @param sparkSession
- * @param fixedStorePath
- * @return
- */
- def getDatabaseLocation(dbName: String, sparkSession: SparkSession,
- fixedStorePath: String): String = {
- var databaseLocation =
- sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getDatabaseMetadata(dbName)
- .locationUri.toString
- // for default database and db ends with .db
- // check whether the carbon store and hive store is same or different.
- if (dbName.equals("default") || databaseLocation.endsWith(".db")) {
- val properties = CarbonProperties.getInstance()
- val carbonStorePath = FileFactory
- .getUpdatedFilePath(properties.getProperty(CarbonCommonConstants.STORE_LOCATION))
- val hiveStorePath = FileFactory
- .getUpdatedFilePath(sparkSession.conf.get("spark.sql.warehouse.dir"))
- // if carbon.store does not point to spark.sql.warehouse.dir then follow the old table path
- // format
- if (!hiveStorePath.equals(carbonStorePath)) {
- databaseLocation = fixedStorePath + CarbonCommonConstants.FILE_SEPARATOR + dbName
- }
- }
-
- return FileFactory.getUpdatedFilePath(databaseLocation)
- }
-}
-
case class ProjectForUpdate(
table: UnresolvedRelation,
columns: List[String],
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 71590dd..ca371e1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -63,9 +63,13 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
val storePath = CarbonProperties.getStorePath
val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
.append("tempCSV")
- .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)
- .append(CarbonCommonConstants.UNDERSCORE).append(options.tableName)
- .append(CarbonCommonConstants.UNDERSCORE).append(System.nanoTime()).toString
+ .append(CarbonCommonConstants.UNDERSCORE)
+ .append(CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession))
+ .append(CarbonCommonConstants.UNDERSCORE)
+ .append(options.tableName)
+ .append(CarbonCommonConstants.UNDERSCORE)
+ .append(System.nanoTime())
+ .toString
writeToTempCSVFile(tempCSVFolder, options)
val tempCSVPath = new Path(tempCSVFolder)
@@ -133,7 +137,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
private def loadDataFrame(options: CarbonOption): Unit = {
val header = dataFrame.columns.mkString(",")
CarbonLoadDataCommand(
- Some(options.dbName),
+ Some(CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)),
options.tableName,
null,
Seq(),
@@ -168,18 +172,21 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
"DICTIONARY_EXCLUDE" -> options.dictionaryExclude,
"TABLE_BLOCKSIZE" -> options.tableBlockSize
).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",")
+ val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
s"""
- | CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
+ | CREATE TABLE IF NOT EXISTS $dbName.${options.tableName}
| (${ carbonSchema.mkString(", ") })
| STORED BY 'carbondata'
| ${ if (property.nonEmpty) "TBLPROPERTIES (" + property + ")" else "" }
+ | ${ if (options.tablePath.nonEmpty) s"LOCATION '${options.tablePath.get}'" else ""}
""".stripMargin
}
private def makeLoadString(csvFolder: String, options: CarbonOption): String = {
+ val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
s"""
| LOAD DATA INPATH '$csvFolder'
- | INTO TABLE ${options.dbName}.${options.tableName}
+ | INTO TABLE $dbName.${options.tableName}
| OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}',
| 'SINGLE_PASS' = '${options.singlePass}')
""".stripMargin
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 31ff323..57233cf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -46,9 +46,10 @@ case class CarbonDatasourceHadoopRelation(
extends BaseRelation with InsertableRelation {
var caseInsensitiveMap = parameters.map(f => (f._1.toLowerCase, f._2))
- lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(paths.head,
- caseInsensitiveMap.getOrElse("dbname", GetDB.getDatabaseName(None, sparkSession)),
- caseInsensitiveMap.get("tablename").get)
+ lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
+ paths.head,
+ CarbonEnv.getDatabaseName(caseInsensitiveMap.get("dbname"))(sparkSession),
+ caseInsensitiveMap("tablename"))
lazy val databaseName: String = carbonTable.getDatabaseName
lazy val tableName: String = carbonTable.getTableName
CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 1a336c4..771a235 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -504,9 +504,9 @@ class CarbonDecoderRDD(
}
override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
- val absoluteTableIdentifiers = relations.map { relation =>
- val tableInfo = getTableInfo
- (tableInfo.getFactTable.getTableName, tableInfo.getOrCreateAbsoluteTableIdentifier)
+ val tableInfo = getTableInfo
+ val absoluteTableIdentifiers = relations.map { _ =>
+ (tableInfo.getFactTable.getTableName, tableInfo.getOrCreateAbsoluteTableIdentifier())
}.toMap
val cacheProvider: CacheProvider = CacheProvider.getInstance
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index c03b210..811442b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,18 +17,18 @@
package org.apache.spark.sql
-import java.util.Map
import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, CarbonRelation, CarbonSessionCatalog, CarbonSQLConf}
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.hive._
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util._
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.spark.rdd.SparkReadSupport
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
@@ -99,8 +99,7 @@ class CarbonEnv {
object CarbonEnv {
- val carbonEnvMap: Map[SparkSession, CarbonEnv] =
- new ConcurrentHashMap[SparkSession, CarbonEnv]
+ val carbonEnvMap = new ConcurrentHashMap[SparkSession, CarbonEnv]
def getInstance(sparkSession: SparkSession): CarbonEnv = {
if (sparkSession.isInstanceOf[CarbonSession]) {
@@ -117,18 +116,24 @@ object CarbonEnv {
}
/**
- * Return carbon table instance by looking up table in `sparkSession`
+ * Return carbon table instance from cache or by looking up table in `sparkSession`
*/
def getCarbonTable(
databaseNameOp: Option[String],
tableName: String)
(sparkSession: SparkSession): CarbonTable = {
- CarbonEnv
- .getInstance(sparkSession)
- .carbonMetastore
- .lookupRelation(databaseNameOp, tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- .carbonTable
+ val databaseName = getDatabaseName(databaseNameOp)(sparkSession)
+ val catalog = getInstance(sparkSession).carbonMetastore
+ // refresh cache
+ catalog.checkSchemasModifiedTimeAndReloadTables()
+
+ // try to get it from catch, otherwise lookup in catalog
+ catalog.getTableFromMetadataCache(databaseName, tableName)
+ .getOrElse(
+ catalog
+ .lookupRelation(databaseNameOp, tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ .carbonTable)
}
/**
@@ -137,12 +142,7 @@ object CarbonEnv {
def getCarbonTable(
tableIdentifier: TableIdentifier)
(sparkSession: SparkSession): CarbonTable = {
- CarbonEnv
- .getInstance(sparkSession)
- .carbonMetastore
- .lookupRelation(tableIdentifier)(sparkSession)
- .asInstanceOf[CarbonRelation]
- .carbonTable
+ getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession)
}
/**
@@ -155,32 +155,62 @@ object CarbonEnv {
}
/**
- * Return table path for specified table
+ * The method returns the database location
+ * if carbon.storeLocation does point to spark.sql.warehouse.dir then returns
+ * the database locationUri as database location else follows the old behaviour
+ * making database location from carbon fixed store and database name.
+ * @return database location
+ */
+ def getDatabaseLocation(dbName: String, sparkSession: SparkSession): String = {
+ var databaseLocation =
+ sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getDatabaseMetadata(dbName)
+ .locationUri.toString
+ // for default database and db ends with .db
+ // check whether the carbon store and hive store is same or different.
+ if (dbName.equals("default") || databaseLocation.endsWith(".db")) {
+ val properties = CarbonProperties.getInstance()
+ val carbonStorePath =
+ FileFactory.getUpdatedFilePath(properties.getProperty(CarbonCommonConstants.STORE_LOCATION))
+ val hiveStorePath =
+ FileFactory.getUpdatedFilePath(sparkSession.conf.get("spark.sql.warehouse.dir"))
+ // if carbon.store does not point to spark.sql.warehouse.dir then follow the old table path
+ // format
+ if (!hiveStorePath.equals(carbonStorePath)) {
+ databaseLocation = CarbonProperties.getStorePath +
+ CarbonCommonConstants.FILE_SEPARATOR +
+ dbName
+ }
+ }
+
+ FileFactory.getUpdatedFilePath(databaseLocation)
+ }
+
+ /**
+ * Return table path from carbon table. If table does not exist, construct it using
+ * database location and table name
*/
def getTablePath(
databaseNameOp: Option[String],
tableName: String
)(sparkSession: SparkSession): String = {
- val dbLocation = GetDB.getDatabaseLocation(
- databaseNameOp.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
- sparkSession,
- CarbonProperties.getStorePath)
- dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+ try {
+ getCarbonTable(databaseNameOp, tableName)(sparkSession).getTablePath
+ } catch {
+ case _: NoSuchTableException =>
+ val dbName = getDatabaseName(databaseNameOp)(sparkSession)
+ val dbLocation = getDatabaseLocation(dbName, sparkSession)
+ dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+ }
}
- /**
- * Return metadata path for specified table
- */
- def getMetadataPath(
+ def getIdentifier(
databaseNameOp: Option[String],
tableName: String
- )(sparkSession: SparkSession): String = {
- val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
+ )(sparkSession: SparkSession): AbsoluteTableIdentifier = {
+ AbsoluteTableIdentifier.from(
getTablePath(databaseNameOp, tableName)(sparkSession),
getDatabaseName(databaseNameOp)(sparkSession),
tableName)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
- CarbonTablePath.getFolderContainingFile(schemaFilePath)
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index b764132..7fb146c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -40,7 +40,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory}
@@ -67,9 +66,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
None)
case _ =>
val options = new CarbonOption(parameters)
- val storePath = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.STORE_LOCATION)
- val tablePath = storePath + "/" + options.dbName + "/" + options.tableName
+ val tablePath =
+ CarbonEnv.getTablePath(options.dbName, options.tableName)(sqlContext.sparkSession)
CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None)
}
}
@@ -88,16 +86,17 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
"specified when creating CarbonContext")
val options = new CarbonOption(parameters)
- val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
- val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
+ val tablePath = new Path(
+ CarbonEnv.getTablePath(options.dbName, options.tableName)(sqlContext.sparkSession))
val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
.exists(tablePath)
val (doSave, doAppend) = (mode, isExists) match {
case (SaveMode.ErrorIfExists, true) =>
- CarbonException.analysisException(s"path $storePath already exists.")
+ CarbonException.analysisException(s"table path already exists.")
case (SaveMode.Overwrite, true) =>
+ val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
sqlContext.sparkSession
- .sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }")
+ .sql(s"DROP TABLE IF EXISTS $dbName.${options.tableName}")
(true, false)
case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
(true, false)
@@ -124,8 +123,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
dataSchema: StructType): BaseRelation = {
CarbonEnv.getInstance(sqlContext.sparkSession)
addLateDecodeOptimization(sqlContext.sparkSession)
- val dbName: String = parameters.getOrElse("dbName",
- CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
+ val dbName: String =
+ CarbonEnv.getDatabaseName(parameters.get("dbName"))(sqlContext.sparkSession)
val tableOption: Option[String] = parameters.get("tableName")
if (tableOption.isEmpty) {
CarbonException.analysisException("Table creation failed. Table name is not specified")
@@ -154,27 +153,27 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
}
- private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
- dataSchema: StructType) = {
+ private def createTableIfNotExists(
+ sparkSession: SparkSession,
+ parameters: Map[String, String],
+ dataSchema: StructType): (String, Map[String, String]) = {
- val dbName: String = parameters.getOrElse("dbName",
- CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
+ val dbName: String = CarbonEnv.getDatabaseName(parameters.get("dbName"))(sparkSession)
val tableName: String = parameters.getOrElse("tableName", "").toLowerCase
try {
- if (parameters.contains("carbonSchemaPartsNo")) {
- getPathForTable(sparkSession, dbName, tableName, parameters)
- } else {
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(Option(dbName), tableName)(sparkSession)
- (CarbonProperties.getStorePath + s"/$dbName/$tableName", parameters)
- }
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+ (carbonTable.getTablePath, parameters)
} catch {
- case ex: NoSuchTableException =>
+ case _: NoSuchTableException =>
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val updatedParams =
- CarbonSource.updateAndCreateTable(dataSchema, sparkSession, metaStore, parameters)
- getPathForTable(sparkSession, dbName, tableName, updatedParams)
+ val identifier = AbsoluteTableIdentifier.from(
+ CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession),
+ dbName,
+ tableName)
+ val updatedParams = CarbonSource.updateAndCreateTable(
+ identifier, dataSchema, sparkSession, metaStore, parameters)
+ (CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession), updatedParams)
case ex: Exception =>
throw new Exception("do not have dbname and tablename for carbon table", ex)
}
@@ -203,8 +202,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
} else if (!sparkSession.isInstanceOf[CarbonSession]) {
(CarbonProperties.getStorePath + "/" + dbName + "/" + tableName, parameters)
} else {
- val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
- (carbonTable.getTablePath, parameters)
+ (CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession), parameters)
}
} catch {
case ex: Exception =>
@@ -222,54 +220,44 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
outputMode: OutputMode): Sink = {
// check "tablePath" option
- val tablePathOption = parameters.get("tablePath")
- val dbName: String = parameters.getOrElse("dbName",
- CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
- val tableOption: Option[String] = parameters.get("tableName")
- if (tableOption.isEmpty) {
- throw new CarbonStreamException("Table creation failed. Table name is not specified")
- }
- val tableName = tableOption.get.toLowerCase()
+ val options = new CarbonOption(parameters)
+ val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
+ val tableName = options.tableName
if (tableName.contains(" ")) {
throw new CarbonStreamException("Table creation failed. Table name cannot contain blank " +
"space")
}
- if (tablePathOption.isDefined) {
- val sparkSession = sqlContext.sparkSession
- val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-
- if (!carbonTable.isStreamingTable) {
- throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
- s"${carbonTable.getTableName} is not a streaming table")
- }
-
- // create sink
- StreamSinkFactory.createStreamTableSink(
- sqlContext.sparkSession,
- sqlContext.sparkSession.sessionState.newHadoopConf(),
- carbonTable,
- parameters)
- } else {
- throw new CarbonStreamException("Require tablePath option for the write stream")
+ val sparkSession = sqlContext.sparkSession
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+ if (!carbonTable.isStreamingTable) {
+ throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
+ s"${carbonTable.getTableName} is not a streaming table")
}
+
+ // create sink
+ StreamSinkFactory.createStreamTableSink(
+ sqlContext.sparkSession,
+ sqlContext.sparkSession.sessionState.newHadoopConf(),
+ carbonTable,
+ parameters)
}
}
object CarbonSource {
- def createTableInfoFromParams(parameters: Map[String, String],
+ def createTableInfoFromParams(
+ parameters: Map[String, String],
dataSchema: StructType,
- dbName: String,
- tableName: String): TableModel = {
+ identifier: AbsoluteTableIdentifier): TableModel = {
val sqlParser = new CarbonSpark2SqlParser
val fields = sqlParser.getFields(dataSchema)
val map = scala.collection.mutable.Map[String, String]()
parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) }
val options = new CarbonOption(parameters)
val bucketFields = sqlParser.getBucketFields(map, fields, options)
- sqlParser.prepareTableModel(ifNotExistPresent = false, Option(dbName),
- tableName, fields, Nil, map, bucketFields)
+ sqlParser.prepareTableModel(ifNotExistPresent = false, Option(identifier.getDatabaseName),
+ identifier.getTableName, fields, Nil, map, bucketFields)
}
/**
@@ -278,13 +266,23 @@ object CarbonSource {
* @param sparkSession
* @return
*/
- def updateCatalogTableWithCarbonSchema(tableDesc: CatalogTable,
+ def updateCatalogTableWithCarbonSchema(
+ tableDesc: CatalogTable,
sparkSession: SparkSession): CatalogTable = {
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val storageFormat = tableDesc.storage
val properties = storageFormat.properties
if (!properties.contains("carbonSchemaPartsNo")) {
- val map = updateAndCreateTable(tableDesc.schema, sparkSession, metaStore, properties)
+ val tablePath = CarbonEnv.getTablePath(
+ tableDesc.identifier.database, tableDesc.identifier.table)(sparkSession)
+ val dbName = CarbonEnv.getDatabaseName(tableDesc.identifier.database)(sparkSession)
+ val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableDesc.identifier.table)
+ val map = updateAndCreateTable(
+ identifier,
+ tableDesc.schema,
+ sparkSession,
+ metaStore,
+ properties)
// updating params
val updatedFormat = storageFormat.copy(properties = map)
tableDesc.copy(storage = updatedFormat)
@@ -292,7 +290,7 @@ object CarbonSource {
val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
if (!metaStore.isReadFromHiveMetaStore) {
// save to disk
- metaStore.saveToDisk(tableInfo, properties.get("tablePath").get)
+ metaStore.saveToDisk(tableInfo, properties("tablePath"))
// remove schema string from map as we don't store carbon schema to hive metastore
val map = CarbonUtil.removeSchemaFromMap(properties.asJava)
val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
@@ -303,36 +301,26 @@ object CarbonSource {
}
}
- def updateAndCreateTable(dataSchema: StructType,
+ def updateAndCreateTable(
+ identifier: AbsoluteTableIdentifier,
+ dataSchema: StructType,
sparkSession: SparkSession,
metaStore: CarbonMetaStore,
properties: Map[String, String]): Map[String, String] = {
- val dbName: String = properties.getOrElse("dbName",
- CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
- val tableName: String = properties.getOrElse("tableName", "").toLowerCase
- val model = createTableInfoFromParams(properties, dataSchema, dbName, tableName)
- val tableInfo: TableInfo = TableNewProcessor(model)
- val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession, CarbonProperties.getStorePath)
- val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+ val model = createTableInfoFromParams(properties, dataSchema, identifier)
+ val tableInfo: TableInfo = TableNewProcessor(model, identifier)
val schemaEvolutionEntry = new SchemaEvolutionEntry
schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
- tableInfo.getFactTable.getSchemaEvalution.
- getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+ tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
val map = if (metaStore.isReadFromHiveMetaStore) {
- val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
- val schemaMetadataPath =
- CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
- tableInfo.setMetaDataFilepath(schemaMetadataPath)
- tableInfo.setTablePath(tableIdentifier.getTablePath)
CarbonUtil.convertToMultiStringMap(tableInfo)
} else {
- metaStore.saveToDisk(tableInfo, tablePath)
+ metaStore.saveToDisk(tableInfo, identifier.getTablePath)
new java.util.HashMap[String, String]()
}
properties.foreach(e => map.put(e._1, e._2))
- map.put("tablePath", tablePath)
- map.put("dbname", dbName)
+ map.put("tablepath", identifier.getTablePath)
+ map.put("dbname", identifier.getDatabaseName)
map.asScala.toMap
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 622bf0a..f90abb8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -60,7 +60,7 @@ case class CarbonCreateDataMapCommand(
} else {
val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava))
- val dbName = GetDB.getDatabaseName(tableIdentifier.database, sparkSession)
+ val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession)
// upadting the parent table about dataschema
PreAggregateUtil.updateMainTable(dbName, tableIdentifier.table, dataMapSchema, sparkSession)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 46c803d..9a71523 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command.datamap
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.command.AtomicRunnableCommand
@@ -54,14 +54,12 @@ case class CarbonDropDataMapCommand(
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
+ val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val identifier = TableIdentifier(tableName, Option(dbName))
val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
val carbonEnv = CarbonEnv.getInstance(sparkSession)
val catalog = carbonEnv.carbonMetastore
- val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonProperties.getStorePath)
- val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
+ val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
val tableIdentifier =
AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
catalog.checkSchemasModifiedTimeAndReloadTables()
@@ -136,11 +134,7 @@ case class CarbonDropDataMapCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
// delete the table folder
- val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
- val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonProperties.getStorePath)
- val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
- val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
+ val tableIdentifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession)
DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName)
Seq.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
index 45f99fd..d37ca0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
@@ -32,9 +32,9 @@ object DataMapDropTablePostListener extends OperationEventListener {
val dropPostEvent = event.asInstanceOf[DropTablePostEvent]
val carbonTable = dropPostEvent.carbonTable
val sparkSession = dropPostEvent.sparkSession
- if (carbonTable.isDefined && carbonTable.get.hasDataMapSchema) {
+ if (carbonTable.hasDataMapSchema) {
// drop all child tables
- val childSchemas = carbonTable.get.getTableInfo.getDataMapSchemaList
+ val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList
childSchemas.asScala
.filter(_.getRelationIdentifier != null)
.foreach { childSchema =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 0011395..eacfded 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -17,14 +17,12 @@
package org.apache.spark.sql.execution.command.management
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{Checker, DataCommand, DataProcessOperation, RunnableCommand}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.carbondata.api.CarbonStore
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.util.CommonUtil
/**
* Clean data in table
@@ -65,9 +63,8 @@ case class CarbonCleanFilesCommand(
private def deleteAllData(sparkSession: SparkSession,
databaseNameOp: Option[String], tableName: String): Unit = {
- val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
- val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonProperties.getStorePath)
+ val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+ val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
CarbonStore.cleanFiles(
dbName,
tableName,
@@ -85,16 +82,14 @@ case class CarbonCleanFilesCommand(
OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
CarbonStore.cleanFiles(
- GetDB.getDatabaseName(databaseNameOp, sparkSession),
+ CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
tableName,
CarbonProperties.getStorePath,
carbonTable,
forceTableClean)
- val cleanFilesPostEvent: CleanFilesPostEvent =
- CleanFilesPostEvent(carbonTable,
- sparkSession)
- OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
+ val cleanFilesPostEvent: CleanFilesPostEvent = CleanFilesPostEvent(carbonTable, sparkSession)
+ OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent)
}
private def cleanGarbageDataInAllTables(sparkSession: SparkSession): Unit = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 06eb657..a2819cc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.command.management
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.carbondata.api.CarbonStore
@@ -42,7 +42,7 @@ case class CarbonDeleteLoadByIdCommand(
CarbonStore.deleteLoadById(
loadIds,
- GetDB.getDatabaseName(databaseNameOp, sparkSession),
+ CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
tableName,
carbonTable
)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index dbfb030..490bb58 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.command.management
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.carbondata.api.CarbonStore
@@ -42,7 +42,7 @@ case class CarbonDeleteLoadByLoadDateCommand(
CarbonStore.deleteLoadByDate(
loadDate,
- GetDB.getDatabaseName(databaseNameOp, sparkSession),
+ CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
tableName,
carbonTable)
val deleteSegmentPostEvent: DeleteSegmentByDatePostEvent =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 9d21468..ff13299 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
-import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, DataProcessOperation, RunnableCommand, UpdateTableModel}
+import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, DataProcessOperation, RunnableCommand, UpdateTableModel}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.util.{CausedBy, FileUtils}
@@ -59,11 +59,7 @@ case class CarbonLoadDataCommand(
dataFrame: Option[DataFrame] = None,
updateModel: Option[UpdateTableModel] = None,
var tableInfoOp: Option[TableInfo] = None)
- extends RunnableCommand with DataProcessOperation {
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- processData(sparkSession)
- }
+ extends DataCommand {
override def processData(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -75,8 +71,6 @@ case class CarbonLoadDataCommand(
}
}
- val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
carbonProperty.addProperty("zookeeper.enable.lock", "false")
@@ -99,6 +93,7 @@ case class CarbonLoadDataCommand(
// update the property with new value
carbonProperty.addProperty(CarbonCommonConstants.NUM_CORES_LOADING, numCoresLoading)
+ val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
try {
val table = if (tableInfoOp.isDefined) {
@@ -218,8 +213,7 @@ case class CarbonLoadDataCommand(
table.getTableName + "/"
val fileType = FileFactory.getFileType(partitionLocation)
if (FileFactory.isFileExist(partitionLocation, fileType)) {
- val file = FileFactory
- .getCarbonFile(partitionLocation, fileType)
+ val file = FileFactory.getCarbonFile(partitionLocation, fileType)
CarbonUtil.deleteFoldersAndFiles(file)
}
} catch {
@@ -267,7 +261,6 @@ case class CarbonLoadDataCommand(
dimensions,
carbonLoadModel,
sparkSession.sqlContext,
- carbonLoadModel.getTablePath,
dictFolderPath)
}
if (!StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
@@ -275,7 +268,6 @@ case class CarbonLoadDataCommand(
GlobalDictionaryUtil
.generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
carbonLoadModel,
- carbonLoadModel.getTablePath,
carbonTableIdentifier,
dictFolderPath,
dimensions,
@@ -334,10 +326,11 @@ case class CarbonLoadDataCommand(
val getSegIdUDF = udf((tupleId: String) =>
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
// getting all fields except tupleId field as it is not required in the value
- var otherFields = fields.toSeq
- .filter(field => !field.name
- .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
- .map(field => new Column(field.name))
+ var otherFields = fields.toSeq.filter { field =>
+ !field.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
+ }.map { field =>
+ new Column(field.name)
+ }
// extract tupleId field which will be used as a key
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
@@ -356,10 +349,10 @@ case class CarbonLoadDataCommand(
GlobalDictionaryUtil.generateGlobalDictionary(
sparkSession.sqlContext,
carbonLoadModel,
- carbonLoadModel.getTablePath,
hadoopConf,
dictionaryDataFrame)
- CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+ CarbonDataRDDFactory.loadCarbonData(
+ sparkSession.sqlContext,
carbonLoadModel,
carbonLoadModel.getTablePath,
columnar,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index c6898b2..e0311cb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.command.management
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.spark.sql.types.{StringType, TimestampType}
@@ -43,8 +43,6 @@ case class CarbonShowLoadsCommand(
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
CarbonStore.showSegments(
- GetDB.getDatabaseName(databaseNameOp, sparkSession),
- tableName,
limit,
carbonTable.getMetaDataFilepath
)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 3d65862..ecc48cf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command.mutation
import org.apache.spark.sql.{CarbonEnv, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
@@ -34,7 +33,8 @@ import org.apache.carbondata.processing.loading.FailureCauses
*/
private[sql] case class CarbonProjectForDeleteCommand(
plan: LogicalPlan,
- identifier: Seq[String],
+ databaseNameOp: Option[String],
+ tableName: String,
timestamp: String)
extends DataCommand {
@@ -44,10 +44,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
val dataFrame = Dataset.ofRows(sparkSession, plan)
val dataRdd = dataFrame.rdd
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
- asInstanceOf[CarbonRelation]
- val carbonTable = relation.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
// trigger event for Delete from table
val operationContext = new OperationContext
@@ -62,7 +59,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
try {
lockStatus = metadataLock.lockWithRetries()
LOGGER.audit(s" Delete data request has been received " +
- s"for ${ relation.databaseName }.${ relation.tableName }.")
+ s"for ${carbonTable.getDatabaseName}.${carbonTable.getTableName}.")
if (lockStatus) {
LOGGER.info("Successfully able to get the table metadata file lock")
} else {
@@ -73,10 +70,16 @@ private[sql] case class CarbonProjectForDeleteCommand(
// handle the clean up of IUD.
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
- if (DeleteExecution.deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp,
- isUpdateOperation = false, executorErrors)) {
+ if (DeleteExecution.deleteDeltaExecution(
+ databaseNameOp,
+ tableName,
+ sparkSession,
+ dataRdd,
+ timestamp,
+ isUpdateOperation = false,
+ executorErrors)) {
// call IUD Compaction.
- HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation,
+ HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
isUpdateOperation = false)
// trigger post event for Delete from table
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 3aadec3..75008ad 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.storage.StorageLevel
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -35,11 +34,12 @@ import org.apache.carbondata.processing.loading.FailureCauses
private[sql] case class CarbonProjectForUpdateCommand(
plan: LogicalPlan,
- tableIdentifier: Seq[String])
+ databaseNameOp: Option[String],
+ tableName: String)
extends DataCommand {
override def processData(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER = LogServiceFactory.getLogService(CarbonProjectForUpdateCommand.getClass.getName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
val res = plan find {
case relation: LogicalRelation if relation.relation
@@ -51,10 +51,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
if (res.isEmpty) {
return Seq.empty
}
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(DeleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
- asInstanceOf[CarbonRelation]
- val carbonTable = relation.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
// trigger event for Update table
val operationContext = new OperationContext
@@ -94,22 +91,35 @@ private[sql] case class CarbonProjectForUpdateCommand(
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
// do delete operation.
- DeleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd,
- currentTime + "", isUpdateOperation = true, executionErrors)
-
- if(executionErrors.failureCauses != FailureCauses.NONE) {
+ DeleteExecution.deleteDeltaExecution(
+ databaseNameOp,
+ tableName,
+ sparkSession,
+ dataSet.rdd,
+ currentTime + "",
+ isUpdateOperation = true,
+ executionErrors)
+
+ if (executionErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executionErrors.errorMsg)
}
// do update operation.
- performUpdate(dataSet, tableIdentifier, plan, sparkSession, currentTime, executionErrors)
-
- if(executionErrors.failureCauses != FailureCauses.NONE) {
+ performUpdate(dataSet,
+ databaseNameOp,
+ tableName,
+ plan,
+ sparkSession,
+ currentTime,
+ executionErrors)
+
+ if (executionErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executionErrors.errorMsg)
}
// Do IUD Compaction.
- HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = true)
+ HorizontalCompaction.tryHorizontalCompaction(
+ sparkSession, carbonTable, isUpdateOperation = true)
// trigger event for Update table
val updateTablePostEvent: UpdateTablePostEvent =
@@ -150,21 +160,21 @@ private[sql] case class CarbonProjectForUpdateCommand(
private def performUpdate(
dataFrame: Dataset[Row],
- tableIdentifier: Seq[String],
+ databaseNameOp: Option[String],
+ tableName: String,
plan: LogicalPlan,
sparkSession: SparkSession,
currentTime: Long,
executorErrors: ExecutionErrors): Unit = {
def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
-
- val tableName = relation.identifier.getCarbonTableIdentifier.getTableName
- val dbName = relation.identifier.getCarbonTableIdentifier.getDatabaseName
- (tableIdentifier.size > 1 &&
- tableIdentifier(0) == dbName &&
- tableIdentifier(1) == tableName) ||
- (tableIdentifier(0) == tableName)
+ val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+ (databaseNameOp.isDefined &&
+ databaseNameOp.get == dbName &&
+ tableName == relation.identifier.getCarbonTableIdentifier.getTableName) ||
+ (tableName == relation.identifier.getCarbonTableIdentifier.getTableName)
}
+
def getHeader(relation: CarbonDatasourceHadoopRelation, plan: LogicalPlan): String = {
var header = ""
var found = false