You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:40 UTC
[02/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index b7673db..9353a92 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -19,426 +19,37 @@ package org.apache.spark.sql.execution.command
import java.io.File
import java.text.SimpleDateFormat
-import java.util
-import java.util.UUID
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, Map}
import scala.language.implicitConversions
-import scala.util.Random
-import org.apache.spark.SparkEnv
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
-import org.apache.spark.sql.hive.{CarbonHiveMetadataUtil, HiveContext}
+import org.apache.spark.sql.hive.CarbonMetastore
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.util.FileUtils
import org.codehaus.jackson.map.ObjectMapper
-import org.apache.carbondata.common.factory.CarbonCommonFactory
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
-import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
-import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension,
-ColumnSchema}
+import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.carbon.path.CarbonStorePath
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.load.LoadMetadataDetails
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.integration.spark.merger.CompactionType
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.lcm.status.SegmentStatusManager
import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.spark.CarbonSparkFactory
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil,
-GlobalDictionaryUtil}
-
-case class tableModel(
- ifNotExistsSet: Boolean,
- var databaseName: String,
- databaseNameOp: Option[String],
- tableName: String,
- tableProperties: Map[String, String],
- dimCols: Seq[Field],
- msrCols: Seq[Field],
- highcardinalitydims: Option[Seq[String]],
- noInvertedIdxCols: Option[Seq[String]],
- partitioner: Option[Partitioner],
- columnGroups: Seq[String],
- colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None)
-
-case class Field(column: String, var dataType: Option[String], name: Option[String],
- children: Option[List[Field]], parent: String = null,
- storeType: Option[String] = Some("columnar"),
- var precision: Int = 0, var scale: Int = 0)
-
-case class ColumnProperty(key: String, value: String)
-
-case class ComplexField(complexType: String, primitiveField: Option[Field],
- complexField: Option[ComplexField])
-
-case class Partitioner(partitionClass: String, partitionColumn: Array[String], partitionCount: Int,
- nodeList: Array[String])
-
-case class PartitionerField(partitionColumn: String, dataType: Option[String],
- columnComment: String)
-
-case class DataLoadTableFileMapping(table: String, loadPath: String)
-
-case class CarbonMergerMapping(storeLocation: String,
- storePath: String,
- metadataFilePath: String,
- mergedLoadName: String,
- kettleHomePath: String,
- tableCreationTime: Long,
- databaseName: String,
- factTableName: String,
- validSegments: Array[String],
- tableId: String,
- // maxSegmentColCardinality is Cardinality of last segment of compaction
- var maxSegmentColCardinality: Array[Int],
- // maxSegmentColumnSchemaList is list of column schema of last segment of compaction
- var maxSegmentColumnSchemaList: List[ColumnSchema])
-
-case class NodeInfo(TaskId: String, noOfBlocks: Int)
-
-case class AlterTableModel(dbName: Option[String], tableName: String,
- compactionType: String, alterSql: String)
-
-case class CompactionModel(compactionSize: Long,
- compactionType: CompactionType,
- carbonTable: CarbonTable,
- tableCreationTime: Long,
- isDDLTrigger: Boolean)
-
-case class CompactionCallableModel(storePath: String,
- carbonLoadModel: CarbonLoadModel,
- storeLocation: String,
- carbonTable: CarbonTable,
- kettleHomePath: String,
- cubeCreationTime: Long,
- loadsToMerge: util.List[LoadMetadataDetails],
- sqlContext: SQLContext,
- compactionType: CompactionType)
-
-object TableNewProcessor {
- def apply(cm: tableModel, sqlContext: SQLContext): TableInfo = {
- new TableNewProcessor(cm, sqlContext).process
- }
-}
-
-class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
-
- var index = 0
- var rowGroup = 0
-
- def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
- var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
- fieldChildren.foreach(fields => {
- fields.foreach(field => {
- val encoders = new java.util.ArrayList[Encoding]()
- encoders.add(Encoding.DICTIONARY)
- val columnSchema: ColumnSchema = getColumnSchema(
- DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
- field.name.getOrElse(field.column), index,
- isCol = true, encoders, isDimensionCol = true, rowGroup, field.precision, field.scale)
- allColumns ++= Seq(columnSchema)
- index = index + 1
- rowGroup = rowGroup + 1
- if (field.children.get != null) {
- columnSchema.setNumberOfChild(field.children.get.size)
- allColumns ++= getAllChildren(field.children)
- }
- })
- })
- allColumns
- }
-
- def getColumnSchema(dataType: DataType, colName: String, index: Integer, isCol: Boolean,
- encoders: java.util.List[Encoding], isDimensionCol: Boolean,
- colGroup: Integer, precision: Integer, scale: Integer): ColumnSchema = {
- val columnSchema = new ColumnSchema()
- columnSchema.setDataType(dataType)
- columnSchema.setColumnName(colName)
- val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
- if (highCardinalityDims.contains(colName)) {
- encoders.remove(encoders.remove(Encoding.DICTIONARY))
- }
- if (dataType == DataType.TIMESTAMP) {
- encoders.add(Encoding.DIRECT_DICTIONARY)
- }
- val colPropMap = new java.util.HashMap[String, String]()
- if (cm.colProps.isDefined && null != cm.colProps.get.get(colName)) {
- val colProps = cm.colProps.get.get(colName)
- colProps.asScala.foreach { x => colPropMap.put(x.key, x.value) }
- }
- columnSchema.setColumnProperties(colPropMap)
- columnSchema.setEncodingList(encoders)
- val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
- val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.databaseName,
- columnSchema)
- columnSchema.setColumnUniqueId(columnUniqueId)
- columnSchema.setColumnReferenceId(columnUniqueId)
- columnSchema.setColumnar(isCol)
- columnSchema.setDimensionColumn(isDimensionCol)
- columnSchema.setColumnGroup(colGroup)
- columnSchema.setPrecision(precision)
- columnSchema.setScale(scale)
- // TODO: Need to fill RowGroupID, converted type
- // & Number of Children after DDL finalization
- columnSchema
- }
-
- // process create dml fields and create wrapper TableInfo object
- def process: TableInfo = {
- val LOGGER = LogServiceFactory.getLogService(TableNewProcessor.getClass.getName)
- var allColumns = Seq[ColumnSchema]()
- var index = 0
- cm.dimCols.foreach(field => {
- val encoders = new java.util.ArrayList[Encoding]()
- encoders.add(Encoding.DICTIONARY)
- val columnSchema: ColumnSchema = getColumnSchema(
- DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
- field.name.getOrElse(field.column),
- index,
- isCol = true,
- encoders,
- isDimensionCol = true,
- -1,
- field.precision,
- field.scale)
- allColumns ++= Seq(columnSchema)
- index = index + 1
- if (field.children.isDefined && field.children.get != null) {
- columnSchema.setNumberOfChild(field.children.get.size)
- allColumns ++= getAllChildren(field.children)
- }
- })
-
- cm.msrCols.foreach(field => {
- val encoders = new java.util.ArrayList[Encoding]()
- val columnSchema: ColumnSchema = getColumnSchema(
- DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
- field.name.getOrElse(field.column),
- index,
- isCol = true,
- encoders,
- isDimensionCol = false,
- -1,
- field.precision,
- field.scale)
- val measureCol = columnSchema
-
- allColumns ++= Seq(measureCol)
- index = index + 1
- })
-
- // Check if there is any duplicate measures or dimensions.
- // Its based on the dimension name and measure name
- allColumns.groupBy(_.getColumnName).foreach(f => if (f._2.size > 1) {
- val name = f._1
- LOGGER.error(s"Duplicate column found with name: $name")
- LOGGER.audit(
- s"Validation failed for Create/Alter Table Operation " +
- s"for ${ cm.databaseName }.${ cm.tableName }" +
- s"Duplicate column found with name: $name")
- sys.error(s"Duplicate dimensions found with name: $name")
- })
-
- val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
-
- checkColGroupsValidity(cm.columnGroups, allColumns, highCardinalityDims)
-
- updateColumnGroupsInFields(cm.columnGroups, allColumns)
-
- var newOrderedDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
- val complexDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
- val measures = scala.collection.mutable.ListBuffer[ColumnSchema]()
- for (column <- allColumns) {
- if (highCardinalityDims.contains(column.getColumnName)) {
- newOrderedDims += column
- } else if (column.isComplex) {
- complexDims += column
- } else if (column.isDimensionColumn) {
- newOrderedDims += column
- } else {
- measures += column
- }
-
- }
-
- // Setting the boolean value of useInvertedIndex in column schema
- val noInvertedIndexCols = cm.noInvertedIdxCols.getOrElse(Seq())
- for (column <- allColumns) {
- // When the column is measure or the specified no inverted index column in DDL,
- // set useInvertedIndex to false, otherwise true.
- if (noInvertedIndexCols.contains(column.getColumnName) ||
- cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) {
- column.setUseInvertedIndex(false)
- } else {
- column.setUseInvertedIndex(true)
- }
- }
-
- // Adding dummy measure if no measure is provided
- if (measures.size < 1) {
- val encoders = new java.util.ArrayList[Encoding]()
- val columnSchema: ColumnSchema = getColumnSchema(DataType.DOUBLE,
- CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
- index,
- true,
- encoders,
- false,
- -1, 0, 0)
- columnSchema.setInvisible(true)
- val measureColumn = columnSchema
- measures += measureColumn
- allColumns = allColumns ++ measures
- }
- val columnValidator = CarbonSparkFactory.getCarbonColumnValidator()
- columnValidator.validateColumns(allColumns)
- newOrderedDims = newOrderedDims ++ complexDims ++ measures
-
- cm.partitioner match {
- case Some(part: Partitioner) =>
- var definedpartCols = part.partitionColumn
- val columnBuffer = new ArrayBuffer[String]
- part.partitionColumn.foreach { col =>
- newOrderedDims.foreach { dim =>
- if (dim.getColumnName.equalsIgnoreCase(col)) {
- definedpartCols = definedpartCols.dropWhile { c => c.equals(col) }
- columnBuffer += col
- }
- }
- }
-
- // Special Case, where Partition count alone is sent to Carbon for dataloading
- if (part.partitionClass.isEmpty) {
- if (part.partitionColumn(0).isEmpty) {
- Partitioner(
- "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
- Array(""), part.partitionCount, null)
- } else {
- // case where partition cols are set and partition class is not set.
- // so setting the default value.
- Partitioner(
- "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
- part.partitionColumn, part.partitionCount, null)
- }
- } else if (definedpartCols.nonEmpty) {
- val msg = definedpartCols.mkString(", ")
- LOGGER.error(s"partition columns specified are not part of Dimension columns: $msg")
- LOGGER.audit(
- s"Validation failed for Create/Alter Table Operation for " +
- s"${ cm.databaseName }.${ cm.tableName } " +
- s"partition columns specified are not part of Dimension columns: $msg")
- sys.error(s"partition columns specified are not part of Dimension columns: $msg")
- } else {
-
- try {
- Class.forName(part.partitionClass).newInstance()
- } catch {
- case e: Exception =>
- val cl = part.partitionClass
- LOGGER.audit(
- s"Validation failed for Create/Alter Table Operation for " +
- s"${ cm.databaseName }.${ cm.tableName } " +
- s"partition class specified can not be found or loaded: $cl")
- sys.error(s"partition class specified can not be found or loaded: $cl")
- }
-
- Partitioner(part.partitionClass, columnBuffer.toArray, part.partitionCount, null)
- }
- case None =>
- Partitioner("org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
- Array(""), 20, null)
- }
- val tableInfo = new TableInfo()
- val tableSchema = new TableSchema()
- val schemaEvol = new SchemaEvolution()
- schemaEvol
- .setSchemaEvolutionEntryList(new util.ArrayList[SchemaEvolutionEntry]())
- tableSchema.setTableId(UUID.randomUUID().toString)
- // populate table properties map
- val tablePropertiesMap = new java.util.HashMap[String, String]()
- cm.tableProperties.foreach {
- x => tablePropertiesMap.put(x._1, x._2)
- }
- tableSchema.setTableProperties(tablePropertiesMap)
- tableSchema.setTableName(cm.tableName)
- tableSchema.setListOfColumns(allColumns.asJava)
- tableSchema.setSchemaEvalution(schemaEvol)
- tableInfo.setDatabaseName(cm.databaseName)
- tableInfo.setTableUniqueName(cm.databaseName + "_" + cm.tableName)
- tableInfo.setLastUpdatedTime(System.currentTimeMillis())
- tableInfo.setFactTable(tableSchema)
- tableInfo.setAggregateTableList(new util.ArrayList[TableSchema]())
- tableInfo
- }
-
- // For checking if the specified col group columns are specified in fields list.
- protected def checkColGroupsValidity(colGrps: Seq[String],
- allCols: Seq[ColumnSchema],
- highCardCols: Seq[String]): Unit = {
- if (null != colGrps) {
- colGrps.foreach(columngroup => {
- val rowCols = columngroup.split(",")
- rowCols.foreach(colForGrouping => {
- var found: Boolean = false
- // check for dimensions + measures
- allCols.foreach(eachCol => {
- if (eachCol.getColumnName.equalsIgnoreCase(colForGrouping.trim())) {
- found = true
- }
- })
- // check for No Dicitonary dimensions
- highCardCols.foreach(noDicCol => {
- if (colForGrouping.trim.equalsIgnoreCase(noDicCol)) {
- found = true
- }
- })
-
- if (!found) {
- sys.error(s"column $colForGrouping is not present in Field list")
- }
- })
- })
- }
- }
-
- // For updating the col group details for fields.
- private def updateColumnGroupsInFields(colGrps: Seq[String], allCols: Seq[ColumnSchema]): Unit = {
- if (null != colGrps) {
- var colGroupId = -1
- colGrps.foreach(columngroup => {
- colGroupId += 1
- val rowCols = columngroup.split(",")
- rowCols.foreach(row => {
-
- allCols.foreach(eachCol => {
-
- if (eachCol.getColumnName.equalsIgnoreCase(row.trim)) {
- eachCol.setColumnGroup(colGroupId)
- eachCol.setColumnar(false)
- }
- })
- })
- })
- }
- }
-}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
/**
* Command for the compaction in alter table command
@@ -459,7 +70,7 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
}
val relation =
- CarbonEnv.getInstance(sqlContext).carbonCatalog
+ CarbonEnv.get.carbonMetastore
.lookupRelation1(Option(databaseName), tableName)(sqlContext)
.asInstanceOf[CarbonRelation]
if (relation == null) {
@@ -478,7 +89,6 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
carbonLoadModel.setStorePath(relation.tableMeta.storePath)
- val partitioner = relation.tableMeta.partitioner
val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
var storeLocation = CarbonProperties.getInstance
@@ -507,7 +117,7 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
}
}
-case class CreateTable(cm: tableModel) extends RunnableCommand {
+case class CreateTable(cm: TableModel) extends RunnableCommand {
def run(sqlContext: SQLContext): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -531,7 +141,7 @@ case class CreateTable(cm: tableModel) extends RunnableCommand {
}
} else {
// Add Database to catalog and persist
- val catalog = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val catalog = CarbonEnv.get.carbonMetastore
// Need to fill partitioner class when we support partition
val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName, null)(sqlContext)
try {
@@ -544,7 +154,7 @@ case class CreateTable(cm: tableModel) extends RunnableCommand {
val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
// call the drop table to delete the created table.
- CarbonEnv.getInstance(sqlContext).carbonCatalog
+ CarbonEnv.get.carbonMetastore
.dropTable(catalog.storePath, identifier)(sqlContext)
LOGGER.audit(s"Table creation with Database name [$dbName] " +
@@ -581,7 +191,7 @@ private[sql] case class DeleteLoadsById(
val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
val identifier = TableIdentifier(tableName, Option(dbName))
- val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
+ val relation = CarbonEnv.get.carbonMetastore.lookupRelation1(
identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
if (relation == null) {
LOGGER.audit(s"Delete segment by Id is failed. Table $dbName.$tableName does not exist")
@@ -591,7 +201,7 @@ private[sql] case class DeleteLoadsById(
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
if (null == carbonTable) {
- CarbonEnv.getInstance(sqlContext).carbonCatalog
+ CarbonEnv.get.carbonMetastore
.lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
}
val path = carbonTable.getMetaDataFilepath
@@ -640,7 +250,7 @@ private[sql] case class DeleteLoadsByLoadDate(
LOGGER.audit("The delete segment by load date request has been received.")
val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
val identifier = TableIdentifier(tableName, Option(dbName))
- val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val relation = CarbonEnv.get.carbonMetastore
.lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
if (relation == null) {
LOGGER
@@ -658,7 +268,7 @@ private[sql] case class DeleteLoadsByLoadDate(
val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
.getCarbonTable(dbName + '_' + tableName)
if (null == carbonTable) {
- var relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ var relation = CarbonEnv.get.carbonMetastore
.lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
}
val path = carbonTable.getMetaDataFilepath()
@@ -683,6 +293,44 @@ private[sql] case class DeleteLoadsByLoadDate(
}
+object LoadTable {
+
+ def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
+ sqlContext: SQLContext,
+ model: DictionaryLoadModel,
+ noDictDimension: Array[CarbonDimension]): Unit = {
+
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
+ model.table)
+ val schemaFilePath = carbonTablePath.getSchemaFilePath
+
+ // read TableInfo
+ val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
+
+ // modify TableInfo
+ val columns = tableInfo.getFact_table.getTable_columns
+ for (i <- 0 until columns.size) {
+ if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
+ columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
+ }
+ }
+
+ // write TableInfo
+ CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
+
+ // update Metadata
+ val catalog = CarbonEnv.get.carbonMetastore
+ catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
+ model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
+
+ // update CarbonDataLoadSchema
+ val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
+ model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
+ carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+ }
+
+}
+
case class LoadTable(
databaseNameOp: Option[String],
tableName: String,
@@ -699,7 +347,6 @@ case class LoadTable(
def run(sqlContext: SQLContext): Seq[Row] = {
val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
- val identifier = TableIdentifier(tableName, Option(dbName))
if (isOverwriteExist) {
sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
}
@@ -710,7 +357,7 @@ case class LoadTable(
sys.error(s"Data loading failed. table not found: $dbName.$tableName")
}
- val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val relation = CarbonEnv.get.carbonMetastore
.lookupRelation1(Option(dbName), tableName)(sqlContext)
.asInstanceOf[CarbonRelation]
if (relation == null) {
@@ -725,15 +372,13 @@ case class LoadTable(
try {
if (carbonLock.lockWithRetries()) {
logInfo("Successfully able to get the table metadata file lock")
- }
- else {
+ } else {
sys.error("Table is locked for updation. Please try after some time")
}
val factPath = if (dataFrame.isDefined) {
""
- }
- else {
+ } else {
FileUtils.getPaths(
CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
}
@@ -743,8 +388,7 @@ case class LoadTable(
carbonLoadModel.setStorePath(relation.tableMeta.storePath)
if (dimFilesPath.isEmpty) {
carbonLoadModel.setDimFolderPath(null)
- }
- else {
+ } else {
val x = dimFilesPath.map(f => f.table + ":" + CarbonUtil.checkAndAppendHDFSUrl(f.loadPath))
carbonLoadModel.setDimFolderPath(x.mkString(","))
}
@@ -755,9 +399,8 @@ case class LoadTable(
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- val configuredStore = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
- var partitionLocation = relation.tableMeta.storePath + "/partition/" +
+ val partitionLocation = relation.tableMeta.storePath + "/partition/" +
relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
relation.tableMeta.carbonTableIdentifier.getTableName + "/"
@@ -830,18 +473,16 @@ case class LoadTable(
// set local dictionary path, and dictionary file extension
carbonLoadModel.setAllDictPath(allDictionaryPath)
- var partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+ val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
try {
// First system has to partition the data first and then call the load data
- if (null == relation.tableMeta.partitioner.partitionColumn ||
- relation.tableMeta.partitioner.partitionColumn(0).isEmpty) {
- LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
- carbonLoadModel.setFactFilePath(factPath)
- carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter))
- carbonLoadModel.setCsvHeader(fileHeader)
- carbonLoadModel.setColDictFilePath(columnDict)
- carbonLoadModel.setDirectLoad(true)
- }
+ LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+ carbonLoadModel.setFactFilePath(factPath)
+ carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter))
+ carbonLoadModel.setCsvHeader(fileHeader)
+ carbonLoadModel.setColDictFilePath(columnDict)
+ carbonLoadModel.setDirectLoad(true)
+ GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata
GlobalDictionaryUtil
.generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath,
dataFrame)
@@ -849,7 +490,6 @@ case class LoadTable(
carbonLoadModel,
relation.tableMeta.storePath,
kettleHomePath,
- relation.tableMeta.partitioner,
columinar,
partitionStatus,
useKettle,
@@ -934,7 +574,7 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O
val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
val carbonLock = CarbonLockFactory
.getCarbonLockObj(carbonTableIdentifier, LockUsage.DROP_TABLE_LOCK)
- val storePath = CarbonEnv.getInstance(sqlContext).carbonCatalog.storePath
+ val storePath = CarbonEnv.get.carbonMetastore.storePath
var isLocked = false
try {
isLocked = carbonLock.lockWithRetries()
@@ -946,7 +586,7 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O
sys.error("Table is locked for deletion. Please try after some time")
}
LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
- CarbonEnv.getInstance(sqlContext).carbonCatalog.dropTable(storePath, identifier)(sqlContext)
+ CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sqlContext)
LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
} finally {
if (carbonLock != null && isLocked) {
@@ -988,7 +628,7 @@ private[sql] case class ShowLoads(
val tableUniqueName = databaseName + "_" + tableName
// Here using checkSchemasModifiedTimeAndReloadTables in tableExists to reload metadata if
// schema is changed by other process, so that tableInfoMap woulb be refilled.
- val tableExists = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val tableExists = CarbonEnv.get.carbonMetastore
.tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
if (!tableExists) {
sys.error(s"$databaseName.$tableName is not found")
@@ -1045,7 +685,7 @@ private[sql] case class DescribeCommandFormatted(
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
- val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val relation = CarbonEnv.get.carbonMetastore
.lookupRelation1(tblIdentifier)(sqlContext).asInstanceOf[CarbonRelation]
val mapper = new ObjectMapper()
val colProps = StringBuilder.newBuilder
@@ -1130,7 +770,7 @@ private[sql] case class DeleteLoadByDate(
val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
val identifier = TableIdentifier(tableName, Option(dbName))
- val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val relation = CarbonEnv.get.carbonMetastore
.lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
var level: String = ""
val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata
@@ -1156,7 +796,7 @@ private[sql] case class DeleteLoadByDate(
new CarbonDataLoadSchema(carbonTable),
dbName,
tableName,
- CarbonEnv.getInstance(sqlContext).carbonCatalog.storePath,
+ CarbonEnv.get.carbonMetastore.storePath,
level,
actualColName,
dateValue)
@@ -1176,7 +816,7 @@ private[sql] case class CleanFiles(
val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
val identifier = TableIdentifier(tableName, Option(dbName))
- val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val relation = CarbonEnv.get.carbonMetastore
.lookupRelation1(identifier)(sqlContext).
asInstanceOf[CarbonRelation]
if (relation == null) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
new file mode 100644
index 0000000..bee891c
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io._
+import java.util.{GregorianCalendar, UUID}
+
+import scala.Array.canBuildFrom
+import scala.collection.mutable.ArrayBuffer
+import scala.language.implicitConversions
+import scala.util.parsing.combinator.RegexParsers
+
+import org.apache.spark
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.sql.hive.client.ClientInterface
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType
+import org.apache.carbondata.core.reader.ThriftReader
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.core.writer.ThriftWriter
+import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.lcm.locks.ZookeeperInit
+import org.apache.carbondata.spark.merger.TableMeta
+
+case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])
+
+case class CarbonMetaData(dims: Seq[String],
+ msrs: Seq[String],
+ carbonTable: CarbonTable,
+ dictionaryMap: DictionaryMap)
+
+object CarbonMetastore {
+
+ def readSchemaFileToThriftTable(schemaFilePath: String): TableInfo = {
+ val createTBase = new ThriftReader.TBaseCreator() {
+ override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
+ new TableInfo()
+ }
+ }
+ val thriftReader = new ThriftReader(schemaFilePath, createTBase)
+ var tableInfo: TableInfo = null
+ try {
+ thriftReader.open()
+ tableInfo = thriftReader.read().asInstanceOf[TableInfo]
+ } finally {
+ thriftReader.close()
+ }
+ tableInfo
+ }
+
+ def writeThriftTableToSchemaFile(schemaFilePath: String, tableInfo: TableInfo): Unit = {
+ val thriftWriter = new ThriftWriter(schemaFilePath, false)
+ try {
+ thriftWriter.open()
+ thriftWriter.write(tableInfo);
+ } finally {
+ thriftWriter.close()
+ }
+ }
+
+}
+
+case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
+ def get(name: String): Option[Boolean] = {
+ dictionaryMap.get(name.toLowerCase)
+ }
+}
+
+class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
+ client: ClientInterface, queryId: String) extends HiveMetastoreCatalog(client, hiveContext) {
+
+ @transient
+ val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
+
+ val tableModifiedTimeStore = new java.util.HashMap[String, Long]()
+ tableModifiedTimeStore
+ .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis())
+
+ val metadata = loadMetadata(storePath)
+
+ def getTableCreationTime(databaseName: String, tableName: String): Long = {
+ val tableMeta = metadata.tablesMeta.filter(
+ c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(databaseName) &&
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+ val tableCreationTime = tableMeta.head.carbonTable.getTableLastUpdatedTime
+ tableCreationTime
+ }
+
+ def lookupRelation1(dbName: Option[String],
+ tableName: String)(sqlContext: SQLContext): LogicalPlan = {
+ lookupRelation1(TableIdentifier(tableName, dbName))(sqlContext)
+ }
+
+ def lookupRelation1(tableIdentifier: TableIdentifier,
+ alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = {
+ checkSchemasModifiedTimeAndReloadTables()
+ val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
+ val tables = getTableFromMetadata(database, tableIdentifier.table)
+ tables match {
+ case Some(t) =>
+ CarbonRelation(database, tableIdentifier.table,
+ CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias)(sqlContext)
+ case None =>
+ LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}")
+ throw new NoSuchTableException
+ }
+ }
+
+ /**
+ * This method will search for a table in the catalog metadata
+ *
+ * @param database
+ * @param tableName
+ * @return
+ */
+ def getTableFromMetadata(database: String,
+ tableName: String): Option[TableMeta] = {
+ metadata.tablesMeta
+ .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+ }
+
+ def tableExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
+ checkSchemasModifiedTimeAndReloadTables()
+ val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
+ val tables = metadata.tablesMeta.filter(
+ c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+ tables.nonEmpty
+ }
+
+ def loadMetadata(metadataPath: String): MetaData = {
+ val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
+ val statistic = new QueryStatistic()
+ // creating zookeeper instance once.
+ // if zookeeper is configured as carbon lock type.
+ val zookeeperUrl: String = hiveContext.getConf(CarbonCommonConstants.ZOOKEEPER_URL, null)
+ if (zookeeperUrl != null) {
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperUrl)
+ ZookeeperInit.getInstance(zookeeperUrl)
+ LOGGER.info("Zookeeper url is configured. Taking the zookeeper as lock type.")
+ var configuredLockType = CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.LOCK_TYPE)
+ if (null == configuredLockType) {
+ configuredLockType = CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
+ CarbonProperties.getInstance
+ .addProperty(CarbonCommonConstants.LOCK_TYPE,
+ configuredLockType)
+ }
+ }
+
+ if (metadataPath == null) {
+ return null
+ }
+ val fileType = FileFactory.getFileType(metadataPath)
+ val metaDataBuffer = new ArrayBuffer[TableMeta]
+ fillMetaData(metadataPath, fileType, metaDataBuffer)
+ updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
+ statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
+ System.currentTimeMillis())
+ recorder.recordStatisticsForDriver(statistic, queryId)
+ MetaData(metaDataBuffer)
+ }
+
+ private def fillMetaData(basePath: String, fileType: FileType,
+ metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
+ val databasePath = basePath // + "/schemas"
+ try {
+ if (FileFactory.isFileExist(databasePath, fileType)) {
+ val file = FileFactory.getCarbonFile(databasePath, fileType)
+ val databaseFolders = file.listFiles()
+
+ databaseFolders.foreach(databaseFolder => {
+ if (databaseFolder.isDirectory) {
+ val dbName = databaseFolder.getName
+ val tableFolders = databaseFolder.listFiles()
+
+ tableFolders.foreach(tableFolder => {
+ if (tableFolder.isDirectory) {
+ val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
+ tableFolder.getName, UUID.randomUUID().toString)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
+ carbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+
+ if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+ val tableName = tableFolder.getName
+ val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
+
+
+ val createTBase = new ThriftReader.TBaseCreator() {
+ override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
+ new TableInfo()
+ }
+ }
+ val thriftReader = new ThriftReader(tableMetadataFile, createTBase)
+ thriftReader.open()
+ val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo]
+ thriftReader.close()
+
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
+ val schemaFilePath = CarbonStorePath
+ .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+ wrapperTableInfo.setStorePath(storePath)
+ wrapperTableInfo
+ .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+ CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+ val carbonTable =
+ org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
+ .getCarbonTable(tableUniqueName)
+ metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
+ carbonTable)
+ }
+ }
+ })
+ }
+ })
+ } else {
+ // Create folders and files.
+ FileFactory.mkdirs(databasePath, fileType)
+ }
+ } catch {
+ case s: java.io.FileNotFoundException =>
+ // Create folders and files.
+ FileFactory.mkdirs(databasePath, fileType)
+ }
+ }
+
+ /**
+ *
+ * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
+ * Load CarbonTable from wrapper tableinfo
+ *
+ */
+ def createTableFromThrift(
+ tableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo,
+ dbName: String, tableName: String, partitioner: Partitioner)
+ (sqlContext: SQLContext): String = {
+ if (tableExists(TableIdentifier(tableName, Some(dbName)))(sqlContext)) {
+ sys.error(s"Table [$tableName] already exists under Database [$dbName]")
+ }
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val thriftTableInfo = schemaConverter
+ .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
+ thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
+ .add(schemaEvolutionEntry)
+
+ val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
+ tableInfo.getFactTable.getTableId)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ val schemaFilePath = carbonTablePath.getSchemaFilePath
+ val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
+ tableInfo.setMetaDataFilepath(schemaMetadataPath)
+ tableInfo.setStorePath(storePath)
+ CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+ val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
+ CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName))
+
+ val fileType = FileFactory.getFileType(schemaMetadataPath)
+ if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
+ FileFactory.mkdirs(schemaMetadataPath, fileType)
+ }
+ val thriftWriter = new ThriftWriter(schemaFilePath, false)
+ thriftWriter.open()
+ thriftWriter.write(thriftTableInfo)
+ thriftWriter.close()
+ metadata.tablesMeta += tableMeta
+ logInfo(s"Table $tableName for Database $dbName created successfully.")
+ LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+ carbonTablePath.getPath
+ }
+
+ private def updateMetadataByWrapperTable(
+ wrapperTableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo): Unit = {
+
+ CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+ wrapperTableInfo.getTableUniqueName)
+ for (i <- metadata.tablesMeta.indices) {
+ if (wrapperTableInfo.getTableUniqueName.equals(
+ metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
+ metadata.tablesMeta(i).carbonTable = carbonTable
+ }
+ }
+ }
+
+ def updateMetadataByThriftTable(schemaFilePath: String,
+ tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = {
+
+ tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+ .setTime_stamp(System.currentTimeMillis())
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+ wrapperTableInfo
+ .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+ wrapperTableInfo.setStorePath(storePath)
+ updateMetadataByWrapperTable(wrapperTableInfo)
+ }
+
+ /**
+ * Shows all tables for given schema.
+ */
+ def getTables(databaseName: Option[String])(sqlContext: SQLContext): Seq[(String, Boolean)] = {
+
+ val dbName =
+ databaseName.getOrElse(sqlContext.asInstanceOf[HiveContext].catalog.client.currentDatabase)
+ checkSchemasModifiedTimeAndReloadTables()
+ metadata.tablesMeta.filter { c =>
+ c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName)
+ }.map { c => (c.carbonTableIdentifier.getTableName, false) }
+ }
+
+ def isTablePathExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
+ val dbName = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
+ val tableName = tableIdentifier.table
+
+ val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
+ new CarbonTableIdentifier(dbName, tableName, "")).getPath
+
+ val fileType = FileFactory.getFileType(tablePath)
+ FileFactory.isFileExist(tablePath, fileType)
+ }
+
+ def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+ (sqlContext: SQLContext) {
+ val dbName = tableIdentifier.database.get
+ val tableName = tableIdentifier.table
+
+ val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
+ new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+
+ val fileType = FileFactory.getFileType(metadataFilePath)
+
+ if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+ // while drop we should refresh the schema modified time so that if any thing has changed
+ // in the other beeline need to update.
+ checkSchemasModifiedTimeAndReloadTables
+ val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+ CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+ val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
+ tableIdentifier.table)
+ metadataToBeRemoved match {
+ case Some(tableMeta) =>
+ metadata.tablesMeta -= tableMeta
+ org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
+ .removeTable(dbName + "_" + tableName)
+ org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
+ .removeTable(dbName + "_" + tableName)
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+ case None =>
+ logInfo(s"Metadata does not contain entry for table $tableName in database $dbName")
+ }
+ CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sqlContext)
+ // discard cached table info in cachedDataSourceTables
+ sqlContext.catalog.refreshTable(tableIdentifier)
+ }
+ }
+
+ private def getTimestampFileAndType(databaseName: String, tableName: String) = {
+ val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+ val timestampFileType = FileFactory.getFileType(timestampFile)
+ (timestampFile, timestampFileType)
+ }
+
+ /**
+ * This method will put the updated timestamp of schema file in the table modified time store map
+ *
+ * @param timeStamp
+ */
+ def updateSchemasUpdatedTime(timeStamp: Long) {
+ tableModifiedTimeStore.put("default", timeStamp)
+ }
+
+ /**
+ * This method will read the timestamp of empty schema file
+ *
+ * @param databaseName
+ * @param tableName
+ * @return
+ */
+ def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+ if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+ FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
+ } else {
+ System.currentTimeMillis()
+ }
+ }
+
+ /**
+ * This method will check and create an empty schema timestamp file
+ *
+ * @param databaseName
+ * @param tableName
+ * @return
+ */
+ def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+ if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
+ LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
+ FileFactory.createNewFile(timestampFile, timestampFileType)
+ }
+ val systemTime = System.currentTimeMillis()
+ FileFactory.getCarbonFile(timestampFile, timestampFileType)
+ .setLastModifiedTime(systemTime)
+ systemTime
+ }
+
+ def checkSchemasModifiedTimeAndReloadTables() {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
+ if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+ if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
+ getLastModifiedTime ==
+ tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
+ refreshCache()
+ }
+ }
+ }
+
+ def refreshCache() {
+ metadata.tablesMeta = loadMetadata(storePath).tablesMeta
+ }
+
+ def getSchemaLastUpdatedTime(databaseName: String, tableName: String): Long = {
+ var schemaLastUpdatedTime = System.currentTimeMillis
+ val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+ if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+ schemaLastUpdatedTime = FileFactory.getCarbonFile(timestampFile, timestampFileType)
+ .getLastModifiedTime
+ }
+ schemaLastUpdatedTime
+ }
+
+ def createDatabaseDirectory(dbName: String) {
+ val databasePath = storePath + File.separator + dbName
+ val fileType = FileFactory.getFileType(databasePath)
+ FileFactory.mkdirs(databasePath, fileType)
+ }
+
+ def dropDatabaseDirectory(dbName: String) {
+ val databasePath = storePath + File.separator + dbName
+ val fileType = FileFactory.getFileType(databasePath)
+ if (FileFactory.isFileExist(databasePath, fileType)) {
+ val dbPath = FileFactory.getCarbonFile(databasePath, fileType)
+ CarbonUtil.deleteFoldersAndFiles(dbPath)
+ }
+ }
+
+}
+
+
+object CarbonMetastoreTypes extends RegexParsers {
+ protected lazy val primitiveType: Parser[DataType] =
+ "string" ^^^ StringType |
+ "float" ^^^ FloatType |
+ "int" ^^^ IntegerType |
+ "tinyint" ^^^ ShortType |
+ "short" ^^^ ShortType |
+ "double" ^^^ DoubleType |
+ "long" ^^^ LongType |
+ "binary" ^^^ BinaryType |
+ "boolean" ^^^ BooleanType |
+ fixedDecimalType |
+ "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
+ "varchar\\((\\d+)\\)".r ^^^ StringType |
+ "timestamp" ^^^ TimestampType
+
+ protected lazy val fixedDecimalType: Parser[DataType] =
+ "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
+ case precision ~ scale =>
+ DecimalType(precision.toInt, scale.toInt)
+ }
+
+ protected lazy val arrayType: Parser[DataType] =
+ "array" ~> "<" ~> dataType <~ ">" ^^ {
+ case tpe => ArrayType(tpe)
+ }
+
+ protected lazy val mapType: Parser[DataType] =
+ "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
+ case t1 ~ _ ~ t2 => MapType(t1, t2)
+ }
+
+ protected lazy val structField: Parser[StructField] =
+ "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
+ case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
+ }
+
+ protected lazy val structType: Parser[DataType] =
+ "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
+ case fields => StructType(fields)
+ }
+
+ protected lazy val dataType: Parser[DataType] =
+ arrayType |
+ mapType |
+ structType |
+ primitiveType
+
+ def toDataType(metastoreType: String): DataType = {
+ parseAll(dataType, metastoreType) match {
+ case Success(result, _) => result
+ case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType")
+ }
+ }
+
+ def toMetastoreType(dt: DataType): String = {
+ dt match {
+ case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
+ case StructType(fields) =>
+ s"struct<${
+ fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
+ .mkString(",")
+ }>"
+ case StringType => "string"
+ case FloatType => "float"
+ case IntegerType => "int"
+ case ShortType => "tinyint"
+ case DoubleType => "double"
+ case LongType => "bigint"
+ case BinaryType => "binary"
+ case BooleanType => "boolean"
+ case DecimalType() => "decimal"
+ case TimestampType => "timestamp"
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
deleted file mode 100644
index d219bcb..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ /dev/null
@@ -1,576 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io._
-import java.util.{GregorianCalendar, UUID}
-
-import scala.Array.canBuildFrom
-import scala.collection.mutable.ArrayBuffer
-import scala.language.implicitConversions
-import scala.util.parsing.combinator.RegexParsers
-
-import org.apache.spark
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.Partitioner
-import org.apache.spark.sql.hive.client.ClientInterface
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
-import org.apache.carbondata.core.carbon.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType
-import org.apache.carbondata.core.reader.ThriftReader
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
-import org.apache.carbondata.core.writer.ThriftWriter
-import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.lcm.locks.ZookeeperInit
-import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
-
-case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])
-
-case class CarbonMetaData(dims: Seq[String],
- msrs: Seq[String],
- carbonTable: CarbonTable,
- dictionaryMap: DictionaryMap)
-
-case class TableMeta(carbonTableIdentifier: CarbonTableIdentifier, storePath: String,
- var carbonTable: CarbonTable, partitioner: Partitioner)
-
-object CarbonMetastoreCatalog {
-
- def readSchemaFileToThriftTable(schemaFilePath: String): TableInfo = {
- val createTBase = new ThriftReader.TBaseCreator() {
- override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
- new TableInfo()
- }
- }
- val thriftReader = new ThriftReader(schemaFilePath, createTBase)
- var tableInfo: TableInfo = null
- try {
- thriftReader.open()
- tableInfo = thriftReader.read().asInstanceOf[TableInfo]
- } finally {
- thriftReader.close()
- }
- tableInfo
- }
-
- def writeThriftTableToSchemaFile(schemaFilePath: String, tableInfo: TableInfo): Unit = {
- val thriftWriter = new ThriftWriter(schemaFilePath, false)
- try {
- thriftWriter.open()
- thriftWriter.write(tableInfo);
- } finally {
- thriftWriter.close()
- }
- }
-
-}
-
-case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
- def get(name: String): Option[Boolean] = {
- dictionaryMap.get(name.toLowerCase)
- }
-}
-
-class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
- client: ClientInterface, queryId: String) extends HiveMetastoreCatalog(client, hiveContext) {
-
- @transient
- val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
-
- val tableModifiedTimeStore = new java.util.HashMap[String, Long]()
- tableModifiedTimeStore
- .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis())
-
- val metadata = loadMetadata(storePath)
-
- def getTableCreationTime(databaseName: String, tableName: String): Long = {
- val tableMeta = metadata.tablesMeta.filter(
- c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(databaseName) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
- val tableCreationTime = tableMeta.head.carbonTable.getTableLastUpdatedTime
- tableCreationTime
- }
-
- def lookupRelation1(dbName: Option[String],
- tableName: String)(sqlContext: SQLContext): LogicalPlan = {
- lookupRelation1(TableIdentifier(tableName, dbName))(sqlContext)
- }
-
- def lookupRelation1(tableIdentifier: TableIdentifier,
- alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = {
- checkSchemasModifiedTimeAndReloadTables()
- val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
- val tables = getTableFromMetadata(database, tableIdentifier.table)
- tables match {
- case Some(t) =>
- CarbonRelation(database, tableIdentifier.table,
- CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias)(sqlContext)
- case None =>
- LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}")
- throw new NoSuchTableException
- }
- }
-
- /**
- * This method will search for a table in the catalog metadata
- *
- * @param database
- * @param tableName
- * @return
- */
- def getTableFromMetadata(database: String,
- tableName: String): Option[TableMeta] = {
- metadata.tablesMeta
- .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
- }
-
- def tableExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
- checkSchemasModifiedTimeAndReloadTables()
- val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
- val tables = metadata.tablesMeta.filter(
- c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
- tables.nonEmpty
- }
-
- def loadMetadata(metadataPath: String): MetaData = {
- val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
- val statistic = new QueryStatistic()
- // creating zookeeper instance once.
- // if zookeeper is configured as carbon lock type.
- val zookeeperUrl: String = hiveContext.getConf(CarbonCommonConstants.ZOOKEEPER_URL, null)
- if (zookeeperUrl != null) {
- CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperUrl)
- ZookeeperInit.getInstance(zookeeperUrl)
- LOGGER.info("Zookeeper url is configured. Taking the zookeeper as lock type.")
- var configuredLockType = CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.LOCK_TYPE)
- if (null == configuredLockType) {
- configuredLockType = CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
- CarbonProperties.getInstance
- .addProperty(CarbonCommonConstants.LOCK_TYPE,
- configuredLockType)
- }
- }
-
- if (metadataPath == null) {
- return null
- }
- val fileType = FileFactory.getFileType(metadataPath)
- val metaDataBuffer = new ArrayBuffer[TableMeta]
- fillMetaData(metadataPath, fileType, metaDataBuffer)
- updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
- statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
- System.currentTimeMillis())
- recorder.recordStatisticsForDriver(statistic, queryId)
- MetaData(metaDataBuffer)
- }
-
- private def fillMetaData(basePath: String, fileType: FileType,
- metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
- val databasePath = basePath // + "/schemas"
- try {
- if (FileFactory.isFileExist(databasePath, fileType)) {
- val file = FileFactory.getCarbonFile(databasePath, fileType)
- val databaseFolders = file.listFiles()
-
- databaseFolders.foreach(databaseFolder => {
- if (databaseFolder.isDirectory) {
- val dbName = databaseFolder.getName
- val tableFolders = databaseFolder.listFiles()
-
- tableFolders.foreach(tableFolder => {
- if (tableFolder.isDirectory) {
- val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
- tableFolder.getName, UUID.randomUUID().toString)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
- carbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
-
- if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
- val tableName = tableFolder.getName
- val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
-
-
- val createTBase = new ThriftReader.TBaseCreator() {
- override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
- new TableInfo()
- }
- }
- val thriftReader = new ThriftReader(tableMetadataFile, createTBase)
- thriftReader.open()
- val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo]
- thriftReader.close()
-
- val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
- val schemaFilePath = CarbonStorePath
- .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
- wrapperTableInfo.setStorePath(storePath)
- wrapperTableInfo
- .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
- val carbonTable =
- org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
- .getCarbonTable(tableUniqueName)
- metaDataBuffer += TableMeta(
- carbonTable.getCarbonTableIdentifier,
- storePath,
- carbonTable,
- // TODO: Need to update Database thirft to hold partitioner
- // information and reload when required.
- Partitioner("org.apache.carbondata.spark.partition.api.impl." +
- "SampleDataPartitionerImpl",
- Array(""), 1, Array("")))
- }
- }
- })
- }
- })
- } else {
- // Create folders and files.
- FileFactory.mkdirs(databasePath, fileType)
- }
- } catch {
- case s: java.io.FileNotFoundException =>
- // Create folders and files.
- FileFactory.mkdirs(databasePath, fileType)
- }
- }
-
- /**
- *
- * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
- * Load CarbonTable from wrapper tableinfo
- *
- */
- def createTableFromThrift(
- tableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo,
- dbName: String, tableName: String, partitioner: Partitioner)
- (sqlContext: SQLContext): String = {
- if (tableExists(TableIdentifier(tableName, Some(dbName)))(sqlContext)) {
- sys.error(s"Table [$tableName] already exists under Database [$dbName]")
- }
- val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val thriftTableInfo = schemaConverter
- .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
- val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
- thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
- .add(schemaEvolutionEntry)
-
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
- tableInfo.getFactTable.getTableId)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
- val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
- tableInfo.setMetaDataFilepath(schemaMetadataPath)
- tableInfo.setStorePath(storePath)
- CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- val tableMeta = TableMeta(
- carbonTableIdentifier,
- storePath,
- CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName),
- Partitioner("org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
- Array(""), 1, DistributionUtil.getNodeList(hiveContext.sparkContext)))
-
- val fileType = FileFactory.getFileType(schemaMetadataPath)
- if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
- FileFactory.mkdirs(schemaMetadataPath, fileType)
- }
- val thriftWriter = new ThriftWriter(schemaFilePath, false)
- thriftWriter.open()
- thriftWriter.write(thriftTableInfo)
- thriftWriter.close()
- metadata.tablesMeta += tableMeta
- logInfo(s"Table $tableName for Database $dbName created successfully.")
- LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
- carbonTablePath.getPath
- }
-
- private def updateMetadataByWrapperTable(
- wrapperTableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo): Unit = {
-
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
- wrapperTableInfo.getTableUniqueName)
- for (i <- metadata.tablesMeta.indices) {
- if (wrapperTableInfo.getTableUniqueName.equals(
- metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
- metadata.tablesMeta(i).carbonTable = carbonTable
- }
- }
- }
-
- def updateMetadataByThriftTable(schemaFilePath: String,
- tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = {
-
- tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
- .setTime_stamp(System.currentTimeMillis())
- val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
- wrapperTableInfo
- .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
- wrapperTableInfo.setStorePath(storePath)
- updateMetadataByWrapperTable(wrapperTableInfo)
- }
-
- /**
- * Shows all tables for given schema.
- */
- def getTables(databaseName: Option[String])(sqlContext: SQLContext): Seq[(String, Boolean)] = {
-
- val dbName =
- databaseName.getOrElse(sqlContext.asInstanceOf[HiveContext].catalog.client.currentDatabase)
- checkSchemasModifiedTimeAndReloadTables()
- metadata.tablesMeta.filter { c =>
- c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName)
- }.map { c => (c.carbonTableIdentifier.getTableName, false) }
- }
-
- def isTablePathExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
- val dbName = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
- val tableName = tableIdentifier.table
-
- val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
- new CarbonTableIdentifier(dbName, tableName, "")).getPath
-
- val fileType = FileFactory.getFileType(tablePath)
- FileFactory.isFileExist(tablePath, fileType)
- }
-
- def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
- (sqlContext: SQLContext) {
- val dbName = tableIdentifier.database.get
- val tableName = tableIdentifier.table
-
- val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
- new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
-
- val fileType = FileFactory.getFileType(metadataFilePath)
-
- if (FileFactory.isFileExist(metadataFilePath, fileType)) {
- // while drop we should refresh the schema modified time so that if any thing has changed
- // in the other beeline need to update.
- checkSchemasModifiedTimeAndReloadTables
- val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
- CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
- val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
- tableIdentifier.table)
- metadataToBeRemoved match {
- case Some(tableMeta) =>
- metadata.tablesMeta -= tableMeta
- org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
- .removeTable(dbName + "_" + tableName)
- org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
- .removeTable(dbName + "_" + tableName)
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
- case None =>
- logInfo(s"Metadata does not contain entry for table $tableName in database $dbName")
- }
- CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sqlContext)
- // discard cached table info in cachedDataSourceTables
- sqlContext.catalog.refreshTable(tableIdentifier)
- }
- }
-
- private def getTimestampFileAndType(databaseName: String, tableName: String) = {
- val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
- val timestampFileType = FileFactory.getFileType(timestampFile)
- (timestampFile, timestampFileType)
- }
-
- /**
- * This method will put the updated timestamp of schema file in the table modified time store map
- *
- * @param timeStamp
- */
- def updateSchemasUpdatedTime(timeStamp: Long) {
- tableModifiedTimeStore.put("default", timeStamp)
- }
-
- /**
- * This method will read the timestamp of empty schema file
- *
- * @param databaseName
- * @param tableName
- * @return
- */
- def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
- if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
- FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
- } else {
- System.currentTimeMillis()
- }
- }
-
- /**
- * This method will check and create an empty schema timestamp file
- *
- * @param databaseName
- * @param tableName
- * @return
- */
- def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
- if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
- LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
- FileFactory.createNewFile(timestampFile, timestampFileType)
- }
- val systemTime = System.currentTimeMillis()
- FileFactory.getCarbonFile(timestampFile, timestampFileType)
- .setLastModifiedTime(systemTime)
- systemTime
- }
-
- def checkSchemasModifiedTimeAndReloadTables() {
- val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
- if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
- if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
- getLastModifiedTime ==
- tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
- refreshCache()
- }
- }
- }
-
- def refreshCache() {
- metadata.tablesMeta = loadMetadata(storePath).tablesMeta
- }
-
- def getSchemaLastUpdatedTime(databaseName: String, tableName: String): Long = {
- var schemaLastUpdatedTime = System.currentTimeMillis
- val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
- if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
- schemaLastUpdatedTime = FileFactory.getCarbonFile(timestampFile, timestampFileType)
- .getLastModifiedTime
- }
- schemaLastUpdatedTime
- }
-
- def createDatabaseDirectory(dbName: String) {
- val databasePath = storePath + File.separator + dbName
- val fileType = FileFactory.getFileType(databasePath)
- FileFactory.mkdirs(databasePath, fileType)
- }
-
- def dropDatabaseDirectory(dbName: String) {
- val databasePath = storePath + File.separator + dbName
- val fileType = FileFactory.getFileType(databasePath)
- if (FileFactory.isFileExist(databasePath, fileType)) {
- val dbPath = FileFactory.getCarbonFile(databasePath, fileType)
- CarbonUtil.deleteFoldersAndFiles(dbPath)
- }
- }
-
-}
-
-
-object CarbonMetastoreTypes extends RegexParsers {
- protected lazy val primitiveType: Parser[DataType] =
- "string" ^^^ StringType |
- "float" ^^^ FloatType |
- "int" ^^^ IntegerType |
- "tinyint" ^^^ ShortType |
- "short" ^^^ ShortType |
- "double" ^^^ DoubleType |
- "long" ^^^ LongType |
- "binary" ^^^ BinaryType |
- "boolean" ^^^ BooleanType |
- fixedDecimalType |
- "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
- "varchar\\((\\d+)\\)".r ^^^ StringType |
- "timestamp" ^^^ TimestampType
-
- protected lazy val fixedDecimalType: Parser[DataType] =
- "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
- case precision ~ scale =>
- DecimalType(precision.toInt, scale.toInt)
- }
-
- protected lazy val arrayType: Parser[DataType] =
- "array" ~> "<" ~> dataType <~ ">" ^^ {
- case tpe => ArrayType(tpe)
- }
-
- protected lazy val mapType: Parser[DataType] =
- "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
- case t1 ~ _ ~ t2 => MapType(t1, t2)
- }
-
- protected lazy val structField: Parser[StructField] =
- "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
- case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
- }
-
- protected lazy val structType: Parser[DataType] =
- "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
- case fields => StructType(fields)
- }
-
- protected lazy val dataType: Parser[DataType] =
- arrayType |
- mapType |
- structType |
- primitiveType
-
- def toDataType(metastoreType: String): DataType = {
- parseAll(dataType, metastoreType) match {
- case Success(result, _) => result
- case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType")
- }
- }
-
- def toMetastoreType(dt: DataType): String = {
- dt match {
- case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
- case StructType(fields) =>
- s"struct<${
- fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
- .mkString(",")
- }>"
- case StringType => "string"
- case FloatType => "float"
- case IntegerType => "int"
- case ShortType => "tinyint"
- case DoubleType => "double"
- case LongType => "bigint"
- case BinaryType => "binary"
- case BooleanType => "boolean"
- case DecimalType() => "decimal"
- case TimestampType => "timestamp"
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 1e944f7..44dbd6b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -34,10 +34,11 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand}
import org.apache.spark.sql.hive.execution.command._
-import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.types.IntegerType
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
@@ -121,7 +122,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
scan
}
- if (projectList.map(_.toAttribute) == scan.attributesRaw &&
+ if (projectList.map(_.toAttribute) == scan.columnProjection &&
projectSet.size == projectList.size &&
filterSet.subsetOf(projectSet)) {
// copied from spark pruneFilterProjectRaw
@@ -151,9 +152,9 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
predicates,
useUnsafeCoversion = false)(sqlContext)
projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode)
- val updatedAttrs = scan.attributesRaw.map(attr =>
+ val updatedAttrs = scan.columnProjection.map(attr =>
updateDataType(attr.asInstanceOf[AttributeReference], relation, projectExprsNeedToDecode))
- scan.attributesRaw = updatedAttrs
+ scan.columnProjection = updatedAttrs
if (projectExprsNeedToDecode.size() > 0
&& isDictionaryEncoded(projectExprsNeedToDecode.asScala.toSeq, relation)) {
val decoder = getCarbonDecoder(logicalRelation,
@@ -235,15 +236,15 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
object DDLStrategies extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case DropTable(tableName, ifNotExists)
- if (CarbonEnv.getInstance(sqlContext).carbonCatalog
- .isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext)) =>
+ if CarbonEnv.get.carbonMetastore
+ .isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) =>
val identifier = toTableIdentifier(tableName.toLowerCase)
ExecutedCommand(DropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil
case ShowLoadsCommand(databaseName, table, limit) =>
ExecutedCommand(ShowLoads(databaseName, table, limit, plan.output)) :: Nil
case LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
options, isOverwriteExist, inputSqlString, dataFrame) =>
- val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val isCarbonTable = CarbonEnv.get.carbonMetastore
.tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
if (isCarbonTable || options.nonEmpty) {
ExecutedCommand(LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
@@ -252,7 +253,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
ExecutedCommand(HiveNativeCommand(inputSqlString)) :: Nil
}
case alterTable@AlterTableCompaction(altertablemodel) =>
- val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val isCarbonTable = CarbonEnv.get.carbonMetastore
.tableExists(TableIdentifier(altertablemodel.tableName,
altertablemodel.dbName))(sqlContext)
if (isCarbonTable) {
@@ -286,7 +287,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
case e: Exception => ExecutedCommand(d) :: Nil
}
case DescribeFormattedCommand(sql, tblIdentifier) =>
- val isTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ val isTable = CarbonEnv.get.carbonMetastore
.tableExists(tblIdentifier)(sqlContext)
if (isTable) {
val describe =