You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/25 17:37:42 UTC
[05/15] carbondata git commit: [CARBONDATA-1284]Implement hive based
schema storage in carbon
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
new file mode 100644
index 0000000..f245df6
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import java.util.LinkedHashSet
+
+import scala.Array.canBuildFrom
+import scala.collection.JavaConverters._
+import scala.util.parsing.combinator.RegexParsers
+
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.merger.TableMeta
+
+/**
+ * Represents logical plan for one carbon table
+ */
+case class CarbonRelation(
+ databaseName: String,
+ tableName: String,
+ var metaData: CarbonMetaData,
+ tableMeta: TableMeta)
+ extends LeafNode with MultiInstanceRelation {
+
+ def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
+ childDim.getDataType.toString.toLowerCase match {
+ case "array" => s"${
+ childDim.getColName.substring(dimName.length + 1)
+ }:array<${ getArrayChildren(childDim.getColName) }>"
+ case "struct" => s"${
+ childDim.getColName.substring(dimName.length + 1)
+ }:struct<${ getStructChildren(childDim.getColName) }>"
+ case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
+ }
+ }
+
+ def getArrayChildren(dimName: String): String = {
+ metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
+ childDim.getDataType.toString.toLowerCase match {
+ case "array" => s"array<${ getArrayChildren(childDim.getColName) }>"
+ case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>"
+ case dType => addDecimalScaleAndPrecision(childDim, dType)
+ }
+ }).mkString(",")
+ }
+
+ def getStructChildren(dimName: String): String = {
+ metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
+ childDim.getDataType.toString.toLowerCase match {
+ case "array" => s"${
+ childDim.getColName.substring(dimName.length + 1)
+ }:array<${ getArrayChildren(childDim.getColName) }>"
+ case "struct" => s"${
+ childDim.getColName.substring(dimName.length + 1)
+ }:struct<${ metaData.carbonTable.getChildren(childDim.getColName)
+ .asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) }").mkString(",")
+ }>"
+ case dType => s"${ childDim.getColName
+ .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
+ }
+ }).mkString(",")
+ }
+
+ override def newInstance(): LogicalPlan = {
+ CarbonRelation(databaseName, tableName, metaData, tableMeta)
+ .asInstanceOf[this.type]
+ }
+
+ val dimensionsAttr = {
+ val sett = new LinkedHashSet(
+ tableMeta.carbonTable.getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName)
+ .asScala.asJava)
+ sett.asScala.toSeq.map(dim => {
+ val dimval = metaData.carbonTable
+ .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
+ val output: DataType = dimval.getDataType
+ .toString.toLowerCase match {
+ case "array" =>
+ CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
+ case "struct" =>
+ CarbonMetastoreTypes.toDataType(s"struct<${ getStructChildren(dim.getColName) }>")
+ case dType =>
+ val dataType = addDecimalScaleAndPrecision(dimval, dType)
+ CarbonMetastoreTypes.toDataType(dataType)
+ }
+
+ AttributeReference(
+ dim.getColName,
+ output,
+ nullable = true)()
+ })
+ }
+
+ val measureAttr = {
+ val factTable = tableMeta.carbonTable.getFactTableName
+ new LinkedHashSet(
+ tableMeta.carbonTable.
+ getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
+ asScala.asJava).asScala.toSeq
+ .map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType(
+ metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.toString
+ .toLowerCase match {
+ case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
+ case others => others
+ }),
+ nullable = true)())
+ }
+
+ override val output = {
+ val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName)
+ .asScala
+ // convert each column to Attribute
+ columns.filter(!_.isInvisible).map { column =>
+ if (column.isDimension()) {
+ val output: DataType = column.getDataType.toString.toLowerCase match {
+ case "array" =>
+ CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>")
+ case "struct" =>
+ CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>")
+ case dType =>
+ val dataType = addDecimalScaleAndPrecision(column, dType)
+ CarbonMetastoreTypes.toDataType(dataType)
+ }
+ AttributeReference(column.getColName, output, nullable = true )(
+ qualifier = Option(tableName + "." + column.getColName))
+ } else {
+ val output = CarbonMetastoreTypes.toDataType {
+ column.getDataType.toString
+ .toLowerCase match {
+ case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column
+ .getColumnSchema.getScale + ")"
+ case others => others
+ }
+ }
+ AttributeReference(column.getColName, output, nullable = true)(
+ qualifier = Option(tableName + "." + column.getColName))
+ }
+ }
+ }
+
+ def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = {
+ var dType = dataType
+ if (dimval.getDataType == DECIMAL) {
+ dType +=
+ "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
+ }
+ dType
+ }
+
+ // TODO: Use data from the footers.
+ override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
+
+ override def equals(other: Any): Boolean = {
+ other match {
+ case p: CarbonRelation =>
+ p.databaseName == databaseName && p.output == output && p.tableName == tableName
+ case _ => false
+ }
+ }
+
+ def addDecimalScaleAndPrecision(dimval: CarbonDimension, dataType: String): String = {
+ var dType = dataType
+ if (dimval.getDataType == DECIMAL) {
+ dType +=
+ "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
+ }
+ dType
+ }
+
+ private var tableStatusLastUpdateTime = 0L
+
+ private var sizeInBytesLocalValue = 0L
+
+ def sizeInBytes: Long = {
+ val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
+ tableMeta.carbonTable.getAbsoluteTableIdentifier)
+
+ if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
+ val tablePath = CarbonStorePath.getCarbonTablePath(
+ tableMeta.storePath,
+ tableMeta.carbonTableIdentifier).getPath
+ val fileType = FileFactory.getFileType(tablePath)
+ if(FileFactory.isFileExist(tablePath, fileType)) {
+ tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
+ sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
+ }
+ }
+ sizeInBytesLocalValue
+ }
+
+}
+
+object CarbonMetastoreTypes extends RegexParsers {
+ protected lazy val primitiveType: Parser[DataType] =
+ "string" ^^^ StringType |
+ "float" ^^^ FloatType |
+ "int" ^^^ IntegerType |
+ "tinyint" ^^^ ShortType |
+ "short" ^^^ ShortType |
+ "double" ^^^ DoubleType |
+ "long" ^^^ LongType |
+ "binary" ^^^ BinaryType |
+ "boolean" ^^^ BooleanType |
+ fixedDecimalType |
+ "decimal" ^^^ "decimal" ^^^ DecimalType(10, 0) |
+ "varchar\\((\\d+)\\)".r ^^^ StringType |
+ "date" ^^^ DateType |
+ "timestamp" ^^^ TimestampType
+
+ protected lazy val fixedDecimalType: Parser[DataType] =
+ "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
+ case precision ~ scale =>
+ DecimalType(precision.toInt, scale.toInt)
+ }
+
+ protected lazy val arrayType: Parser[DataType] =
+ "array" ~> "<" ~> dataType <~ ">" ^^ {
+ case tpe => ArrayType(tpe)
+ }
+
+ protected lazy val mapType: Parser[DataType] =
+ "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
+ case t1 ~ _ ~ t2 => MapType(t1, t2)
+ }
+
+ protected lazy val structField: Parser[StructField] =
+ "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
+ case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
+ }
+
+ protected lazy val structType: Parser[DataType] =
+ "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
+ case fields => StructType(fields)
+ }
+
+ protected lazy val dataType: Parser[DataType] =
+ arrayType |
+ mapType |
+ structType |
+ primitiveType
+
+ def toDataType(metastoreType: String): DataType = {
+ parseAll(dataType, metastoreType) match {
+ case Success(result, _) => result
+ case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType")
+ }
+ }
+
+ def toMetastoreType(dt: DataType): String = {
+ dt match {
+ case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
+ case StructType(fields) =>
+ s"struct<${
+ fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
+ .mkString(",")
+ }>"
+ case StringType => "string"
+ case FloatType => "float"
+ case IntegerType => "int"
+ case ShortType => "tinyint"
+ case DoubleType => "double"
+ case LongType => "bigint"
+ case BinaryType => "binary"
+ case BooleanType => "boolean"
+ case DecimalType() => "decimal"
+ case TimestampType => "timestamp"
+ case DateType => "date"
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 87717fb..01bdc4f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -65,7 +65,7 @@ object AlterTableUtil {
val acquiredLocks = ListBuffer[ICarbonLock]()
try {
locksToBeAcquired.foreach { lock =>
- acquiredLocks += CarbonLockUtil.getLockObject(table, lock)
+ acquiredLocks += CarbonLockUtil.getLockObject(table.getCarbonTableIdentifier, lock)
}
acquiredLocks.toList
} catch {
@@ -133,6 +133,7 @@ object AlterTableUtil {
val tableName = carbonTable.getFactTableName
CarbonEnv.getInstance(sparkSession).carbonMetastore
.updateTableSchema(carbonTable.getCarbonTableIdentifier,
+ carbonTable.getCarbonTableIdentifier,
thriftTable,
schemaEvolutionEntry,
carbonTable.getStorePath)(sparkSession)
@@ -185,14 +186,18 @@ object AlterTableUtil {
(sparkSession: SparkSession): Unit = {
val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath,
- newCarbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, newCarbonTableIdentifier)
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val tableMetadataFile = carbonTablePath.getPath
val fileType = FileFactory.getFileType(tableMetadataFile)
if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
- val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
- .carbonMetastore
- .readSchemaFile(tableMetadataFile)
+ val tableInfo = if (metastore.isReadFromHiveMetaStore) {
+ // In case of hive metastore we first update the carbonschema inside old table only.
+ metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(storePath,
+ new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)))(sparkSession)
+ } else {
+ metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ }
val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime == timeStamp) {
@@ -200,14 +205,9 @@ object AlterTableUtil {
FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
.renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
oldTableIdentifier.table)
- val tableIdentifier = new CarbonTableIdentifier(database,
- oldTableIdentifier.table,
- tableId)
- CarbonEnv.getInstance(sparkSession).carbonMetastore.revertTableSchema(tableIdentifier,
- tableInfo,
- storePath)(sparkSession)
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .removeTableFromMetadata(database, newTableName)
+ val tableIdentifier = new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)
+ metastore.revertTableSchema(tableIdentifier, tableInfo, storePath)(sparkSession)
+ metastore.removeTableFromMetadata(database, newTableName)
}
}
}
@@ -222,22 +222,21 @@ object AlterTableUtil {
*/
def revertAddColumnChanges(dbName: String, tableName: String, timeStamp: Long)
(sparkSession: SparkSession): Unit = {
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val carbonTable = metastore
.lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
.carbonTable
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .readSchemaFile(tableMetadataFile)
+ val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime == timeStamp) {
LOGGER.info(s"Reverting changes for $dbName.$tableName")
val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added
thriftTable.fact_table.table_columns.removeAll(addedSchemas)
- CarbonEnv.getInstance(sparkSession).carbonMetastore
+ metastore
.revertTableSchema(carbonTable.getCarbonTableIdentifier,
thriftTable, carbonTable.getStorePath)(sparkSession)
}
@@ -253,14 +252,13 @@ object AlterTableUtil {
*/
def revertDropColumnChanges(dbName: String, tableName: String, timeStamp: Long)
(sparkSession: SparkSession): Unit = {
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val carbonTable = metastore
.lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
.carbonTable
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .readSchemaFile(tableMetadataFile)
+ val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime == timeStamp) {
@@ -273,7 +271,7 @@ object AlterTableUtil {
}
}
}
- CarbonEnv.getInstance(sparkSession).carbonMetastore
+ metastore
.revertTableSchema(carbonTable.getCarbonTableIdentifier,
thriftTable, carbonTable.getStorePath)(sparkSession)
}
@@ -289,14 +287,13 @@ object AlterTableUtil {
*/
def revertDataTypeChanges(dbName: String, tableName: String, timeStamp: Long)
(sparkSession: SparkSession): Unit = {
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val carbonTable = metastore
.lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
.carbonTable
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .readSchemaFile(tableMetadataFile)
+ val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime == timeStamp) {
@@ -312,7 +309,7 @@ object AlterTableUtil {
}
}
}
- CarbonEnv.getInstance(sparkSession).carbonMetastore
+ metastore
.revertTableSchema(carbonTable.getCarbonTableIdentifier,
thriftTable, carbonTable.getStorePath)(sparkSession)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index 74e11f1..645081f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -18,6 +18,7 @@
package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.api.CarbonStore
@@ -30,8 +31,9 @@ object CleanFiles {
def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
storePath: String): Unit = {
TableAPIUtil.validateTableExists(spark, dbName, tableName)
- val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore
- .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null)
+ val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
+ lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 3dffb42..f67a5ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -16,7 +16,8 @@
*/
package org.apache.spark.util
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+ import org.apache.spark.sql.{CarbonEnv, SparkSession}
+ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.api.CarbonStore
@@ -29,8 +30,9 @@ object DeleteSegmentByDate {
def deleteSegmentByDate(spark: SparkSession, dbName: String, tableName: String,
dateValue: String): Unit = {
TableAPIUtil.validateTableExists(spark, dbName, tableName)
- val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore
- .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null)
+ val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
+ lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.deleteLoadByDate(dateValue, dbName, tableName, carbonTable)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index 35afa28..bbf386e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -17,6 +17,7 @@
package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.api.CarbonStore
@@ -33,8 +34,9 @@ object DeleteSegmentById {
def deleteSegmentById(spark: SparkSession, dbName: String, tableName: String,
segmentIds: Seq[String]): Unit = {
TableAPIUtil.validateTableExists(spark, dbName, tableName)
- val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore
- .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null)
+ val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
+ lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.deleteLoadById(segmentIds, dbName, tableName, carbonTable)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index 07dfcc1..d5788ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -21,6 +21,7 @@ import java.text.SimpleDateFormat
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.api.CarbonStore
@@ -30,8 +31,9 @@ object ShowSegments {
def showSegments(spark: SparkSession, dbName: String, tableName: String,
limit: Option[String]): Seq[Row] = {
TableAPIUtil.validateTableExists(spark, dbName, tableName)
- val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore
- .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null)
+ val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
+ lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
CarbonStore.showSegments(dbName, tableName, limit, carbonTable.getMetaDataFilepath)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 6afa25b..61589de 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -22,7 +22,9 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
@@ -60,6 +62,14 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
+ // Create table and metadata folders if not exist
+ val carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(table.getStorePath, table.getCarbonTableIdentifier)
+ val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+ val fileType = FileFactory.getFileType(metadataDirectoryPath)
+ if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+ FileFactory.mkdirs(metadataDirectoryPath, fileType)
+ }
carbonLoadModel
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 7f6e88a..4746ecf 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -24,7 +24,9 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
@@ -173,6 +175,14 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
carbonLoadModel.setMaxColumns("100")
+ // Create table and metadata folders if not exist
+ val carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(table.getStorePath, table.getCarbonTableIdentifier)
+ val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+ val fileType = FileFactory.getFileType(metadataDirectoryPath)
+ if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+ FileFactory.mkdirs(metadataDirectoryPath, fileType)
+ }
carbonLoadModel
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index 316881d..f46282d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -195,6 +195,7 @@ class CarbonDataSourceSuite extends Spark2QueryTest with BeforeAndAfterAll {
}
test("test create table with complex datatype") {
+ sql("drop table if exists create_source")
sql("create table create_source(intField int, stringField string, complexField array<string>) USING org.apache.spark.sql.CarbonSource OPTIONS('tableName'='create_source')")
sql("drop table create_source")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 8cdcd26..1788ccb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -256,19 +256,18 @@ public class CarbonCompactionUtil {
/**
* This will check if any compaction request has been received for any table.
*
- * @param tableMetas
+ * @param carbonTables
* @return
*/
- public static TableMeta getNextTableToCompact(TableMeta[] tableMetas,
+ public static CarbonTable getNextTableToCompact(CarbonTable[] carbonTables,
List<CarbonTableIdentifier> skipList) {
- for (TableMeta table : tableMetas) {
- CarbonTable ctable = table.carbonTable;
+ for (CarbonTable ctable : carbonTables) {
String metadataPath = ctable.getMetaDataFilepath();
// check for the compaction required file and at the same time exclude the tables which are
// present in the skip list.
if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
- .contains(table.carbonTableIdentifier)) {
- return table;
+ .contains(ctable.getCarbonTableIdentifier())) {
+ return ctable;
}
}
return null;