You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/25 17:37:42 UTC

[05/15] carbondata git commit: [CARBONDATA-1284]Implement hive based schema storage in carbon

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
new file mode 100644
index 0000000..f245df6
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import java.util.LinkedHashSet
+
+import scala.Array.canBuildFrom
+import scala.collection.JavaConverters._
+import scala.util.parsing.combinator.RegexParsers
+
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.merger.TableMeta
+
+/**
+ * Represents logical plan for one carbon table
+ */
+case class CarbonRelation(
+    databaseName: String,
+    tableName: String,
+    var metaData: CarbonMetaData,
+    tableMeta: TableMeta)
+  extends LeafNode with MultiInstanceRelation {
+
+  def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
+    childDim.getDataType.toString.toLowerCase match {
+      case "array" => s"${
+        childDim.getColName.substring(dimName.length + 1)
+      }:array<${ getArrayChildren(childDim.getColName) }>"
+      case "struct" => s"${
+        childDim.getColName.substring(dimName.length + 1)
+      }:struct<${ getStructChildren(childDim.getColName) }>"
+      case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
+    }
+  }
+
+  def getArrayChildren(dimName: String): String = {
+    metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
+      childDim.getDataType.toString.toLowerCase match {
+        case "array" => s"array<${ getArrayChildren(childDim.getColName) }>"
+        case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>"
+        case dType => addDecimalScaleAndPrecision(childDim, dType)
+      }
+    }).mkString(",")
+  }
+
+  def getStructChildren(dimName: String): String = {
+    metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
+      childDim.getDataType.toString.toLowerCase match {
+        case "array" => s"${
+          childDim.getColName.substring(dimName.length + 1)
+        }:array<${ getArrayChildren(childDim.getColName) }>"
+        case "struct" => s"${
+          childDim.getColName.substring(dimName.length + 1)
+        }:struct<${ metaData.carbonTable.getChildren(childDim.getColName)
+          .asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) }").mkString(",")
+        }>"
+        case dType => s"${ childDim.getColName
+          .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
+      }
+    }).mkString(",")
+  }
+
+  override def newInstance(): LogicalPlan = {
+    CarbonRelation(databaseName, tableName, metaData, tableMeta)
+      .asInstanceOf[this.type]
+  }
+
+  val dimensionsAttr = {
+    val sett = new LinkedHashSet(
+      tableMeta.carbonTable.getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName)
+        .asScala.asJava)
+    sett.asScala.toSeq.map(dim => {
+      val dimval = metaData.carbonTable
+        .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
+      val output: DataType = dimval.getDataType
+        .toString.toLowerCase match {
+        case "array" =>
+          CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
+        case "struct" =>
+          CarbonMetastoreTypes.toDataType(s"struct<${ getStructChildren(dim.getColName) }>")
+        case dType =>
+          val dataType = addDecimalScaleAndPrecision(dimval, dType)
+          CarbonMetastoreTypes.toDataType(dataType)
+      }
+
+      AttributeReference(
+        dim.getColName,
+        output,
+        nullable = true)()
+    })
+  }
+
+  val measureAttr = {
+    val factTable = tableMeta.carbonTable.getFactTableName
+    new LinkedHashSet(
+      tableMeta.carbonTable.
+        getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
+        asScala.asJava).asScala.toSeq
+      .map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType(
+        metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.toString
+          .toLowerCase match {
+          case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
+          case others => others
+        }),
+        nullable = true)())
+  }
+
+  override val output = {
+    val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName)
+      .asScala
+    // convert each column to Attribute
+    columns.filter(!_.isInvisible).map { column =>
+      if (column.isDimension()) {
+        val output: DataType = column.getDataType.toString.toLowerCase match {
+          case "array" =>
+            CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>")
+          case "struct" =>
+            CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>")
+          case dType =>
+            val dataType = addDecimalScaleAndPrecision(column, dType)
+            CarbonMetastoreTypes.toDataType(dataType)
+        }
+        AttributeReference(column.getColName, output, nullable = true )(
+          qualifier = Option(tableName + "." + column.getColName))
+      } else {
+        val output = CarbonMetastoreTypes.toDataType {
+          column.getDataType.toString
+            .toLowerCase match {
+            case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column
+              .getColumnSchema.getScale + ")"
+            case others => others
+          }
+        }
+        AttributeReference(column.getColName, output, nullable = true)(
+          qualifier = Option(tableName + "." + column.getColName))
+      }
+    }
+  }
+
+  def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = {
+    var dType = dataType
+    if (dimval.getDataType == DECIMAL) {
+      dType +=
+      "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
+    }
+    dType
+  }
+
+  // TODO: Use data from the footers.
+  override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case p: CarbonRelation =>
+        p.databaseName == databaseName && p.output == output && p.tableName == tableName
+      case _ => false
+    }
+  }
+
+  def addDecimalScaleAndPrecision(dimval: CarbonDimension, dataType: String): String = {
+    var dType = dataType
+    if (dimval.getDataType == DECIMAL) {
+      dType +=
+      "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
+    }
+    dType
+  }
+
+  private var tableStatusLastUpdateTime = 0L
+
+  private var sizeInBytesLocalValue = 0L
+
+  def sizeInBytes: Long = {
+    val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
+      tableMeta.carbonTable.getAbsoluteTableIdentifier)
+
+    if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
+      val tablePath = CarbonStorePath.getCarbonTablePath(
+        tableMeta.storePath,
+        tableMeta.carbonTableIdentifier).getPath
+      val fileType = FileFactory.getFileType(tablePath)
+      if(FileFactory.isFileExist(tablePath, fileType)) {
+        tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
+        sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
+      }
+    }
+    sizeInBytesLocalValue
+  }
+
+}
+
+object CarbonMetastoreTypes extends RegexParsers {
+  protected lazy val primitiveType: Parser[DataType] =
+    "string" ^^^ StringType |
+    "float" ^^^ FloatType |
+    "int" ^^^ IntegerType |
+    "tinyint" ^^^ ShortType |
+    "short" ^^^ ShortType |
+    "double" ^^^ DoubleType |
+    "long" ^^^ LongType |
+    "binary" ^^^ BinaryType |
+    "boolean" ^^^ BooleanType |
+    fixedDecimalType |
+    "decimal" ^^^ "decimal" ^^^ DecimalType(10, 0) |
+    "varchar\\((\\d+)\\)".r ^^^ StringType |
+    "date" ^^^ DateType |
+    "timestamp" ^^^ TimestampType
+
+  protected lazy val fixedDecimalType: Parser[DataType] =
+    "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
+      case precision ~ scale =>
+        DecimalType(precision.toInt, scale.toInt)
+    }
+
+  protected lazy val arrayType: Parser[DataType] =
+    "array" ~> "<" ~> dataType <~ ">" ^^ {
+      case tpe => ArrayType(tpe)
+    }
+
+  protected lazy val mapType: Parser[DataType] =
+    "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
+      case t1 ~ _ ~ t2 => MapType(t1, t2)
+    }
+
+  protected lazy val structField: Parser[StructField] =
+    "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
+      case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
+    }
+
+  protected lazy val structType: Parser[DataType] =
+    "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
+      case fields => StructType(fields)
+    }
+
+  protected lazy val dataType: Parser[DataType] =
+    arrayType |
+    mapType |
+    structType |
+    primitiveType
+
+  def toDataType(metastoreType: String): DataType = {
+    parseAll(dataType, metastoreType) match {
+      case Success(result, _) => result
+      case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType")
+    }
+  }
+
+  def toMetastoreType(dt: DataType): String = {
+    dt match {
+      case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
+      case StructType(fields) =>
+        s"struct<${
+          fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
+            .mkString(",")
+        }>"
+      case StringType => "string"
+      case FloatType => "float"
+      case IntegerType => "int"
+      case ShortType => "tinyint"
+      case DoubleType => "double"
+      case LongType => "bigint"
+      case BinaryType => "binary"
+      case BooleanType => "boolean"
+      case DecimalType() => "decimal"
+      case TimestampType => "timestamp"
+      case DateType => "date"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 87717fb..01bdc4f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -65,7 +65,7 @@ object AlterTableUtil {
     val acquiredLocks = ListBuffer[ICarbonLock]()
     try {
       locksToBeAcquired.foreach { lock =>
-        acquiredLocks += CarbonLockUtil.getLockObject(table, lock)
+        acquiredLocks += CarbonLockUtil.getLockObject(table.getCarbonTableIdentifier, lock)
       }
       acquiredLocks.toList
     } catch {
@@ -133,6 +133,7 @@ object AlterTableUtil {
     val tableName = carbonTable.getFactTableName
     CarbonEnv.getInstance(sparkSession).carbonMetastore
       .updateTableSchema(carbonTable.getCarbonTableIdentifier,
+        carbonTable.getCarbonTableIdentifier,
         thriftTable,
         schemaEvolutionEntry,
         carbonTable.getStorePath)(sparkSession)
@@ -185,14 +186,18 @@ object AlterTableUtil {
     (sparkSession: SparkSession): Unit = {
     val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
     val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath,
-      newCarbonTableIdentifier)
-    val tableMetadataFile = carbonTablePath.getSchemaFilePath
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, newCarbonTableIdentifier)
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val tableMetadataFile = carbonTablePath.getPath
     val fileType = FileFactory.getFileType(tableMetadataFile)
     if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-      val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
-        .carbonMetastore
-        .readSchemaFile(tableMetadataFile)
+      val tableInfo = if (metastore.isReadFromHiveMetaStore) {
+        // In case of hive metastore we first update the carbonschema inside old table only.
+        metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(storePath,
+          new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)))(sparkSession)
+      } else {
+        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      }
       val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
       val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
       if (updatedTime == timeStamp) {
@@ -200,14 +205,9 @@ object AlterTableUtil {
         FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
           .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
                        oldTableIdentifier.table)
-        val tableIdentifier = new CarbonTableIdentifier(database,
-          oldTableIdentifier.table,
-          tableId)
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.revertTableSchema(tableIdentifier,
-          tableInfo,
-          storePath)(sparkSession)
-        CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .removeTableFromMetadata(database, newTableName)
+        val tableIdentifier = new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)
+        metastore.revertTableSchema(tableIdentifier, tableInfo, storePath)(sparkSession)
+        metastore.removeTableFromMetadata(database, newTableName)
       }
     }
   }
@@ -222,22 +222,21 @@ object AlterTableUtil {
    */
   def revertAddColumnChanges(dbName: String, tableName: String, timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val carbonTable = metastore
       .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
       .carbonTable
 
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
       carbonTable.getCarbonTableIdentifier)
-    val tableMetadataFile = carbonTablePath.getSchemaFilePath
-    val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .readSchemaFile(tableMetadataFile)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime == timeStamp) {
       LOGGER.info(s"Reverting changes for $dbName.$tableName")
       val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added
       thriftTable.fact_table.table_columns.removeAll(addedSchemas)
-      CarbonEnv.getInstance(sparkSession).carbonMetastore
+      metastore
         .revertTableSchema(carbonTable.getCarbonTableIdentifier,
           thriftTable, carbonTable.getStorePath)(sparkSession)
     }
@@ -253,14 +252,13 @@ object AlterTableUtil {
    */
   def revertDropColumnChanges(dbName: String, tableName: String, timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val carbonTable = metastore
       .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
       .carbonTable
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
       carbonTable.getCarbonTableIdentifier)
-    val tableMetadataFile = carbonTablePath.getSchemaFilePath
-    val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .readSchemaFile(tableMetadataFile)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime == timeStamp) {
@@ -273,7 +271,7 @@ object AlterTableUtil {
           }
         }
       }
-      CarbonEnv.getInstance(sparkSession).carbonMetastore
+      metastore
         .revertTableSchema(carbonTable.getCarbonTableIdentifier,
           thriftTable, carbonTable.getStorePath)(sparkSession)
     }
@@ -289,14 +287,13 @@ object AlterTableUtil {
    */
   def revertDataTypeChanges(dbName: String, tableName: String, timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val carbonTable = metastore
       .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
       .carbonTable
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
       carbonTable.getCarbonTableIdentifier)
-    val tableMetadataFile = carbonTablePath.getSchemaFilePath
-    val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .readSchemaFile(tableMetadataFile)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime == timeStamp) {
@@ -312,7 +309,7 @@ object AlterTableUtil {
           }
         }
       }
-      CarbonEnv.getInstance(sparkSession).carbonMetastore
+      metastore
         .revertTableSchema(carbonTable.getCarbonTableIdentifier,
           thriftTable, carbonTable.getStorePath)(sparkSession)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index 74e11f1..645081f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.api.CarbonStore
 
@@ -30,8 +31,9 @@ object CleanFiles {
   def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
       storePath: String): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore
-      .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null)
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
+      lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
+      tableMeta.carbonTable
     CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 3dffb42..f67a5ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -16,7 +16,8 @@
  */
  package org.apache.spark.util
 
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+ import org.apache.spark.sql.{CarbonEnv, SparkSession}
+ import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.api.CarbonStore
 
@@ -29,8 +30,9 @@ object DeleteSegmentByDate {
   def deleteSegmentByDate(spark: SparkSession, dbName: String, tableName: String,
       dateValue: String): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore
-      .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null)
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
+      lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
+      tableMeta.carbonTable
     CarbonStore.deleteLoadByDate(dateValue, dbName, tableName, carbonTable)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index 35afa28..bbf386e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.api.CarbonStore
 
@@ -33,8 +34,9 @@ object DeleteSegmentById {
   def deleteSegmentById(spark: SparkSession, dbName: String, tableName: String,
       segmentIds: Seq[String]): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore
-      .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null)
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
+      lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
+      tableMeta.carbonTable
     CarbonStore.deleteLoadById(segmentIds, dbName, tableName, carbonTable)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index 07dfcc1..d5788ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -21,6 +21,7 @@ import java.text.SimpleDateFormat
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.api.CarbonStore
 
@@ -30,8 +31,9 @@ object ShowSegments {
   def showSegments(spark: SparkSession, dbName: String, tableName: String,
       limit: Option[String]): Seq[Row] = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore
-      .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null)
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
+      lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
+      tableMeta.carbonTable
     CarbonStore.showSegments(dbName, tableName, limit, carbonTable.getMetaDataFilepath)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 6afa25b..61589de 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -22,7 +22,9 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
 
@@ -60,6 +62,14 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
       CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
     carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
+    // Create table and metadata folders if not exist
+    val carbonTablePath = CarbonStorePath
+      .getCarbonTablePath(table.getStorePath, table.getCarbonTableIdentifier)
+    val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+    val fileType = FileFactory.getFileType(metadataDirectoryPath)
+    if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+      FileFactory.mkdirs(metadataDirectoryPath, fileType)
+    }
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 7f6e88a..4746ecf 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -24,7 +24,9 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
@@ -173,6 +175,14 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
       CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
     carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
     carbonLoadModel.setMaxColumns("100")
+    // Create table and metadata folders if not exist
+    val carbonTablePath = CarbonStorePath
+      .getCarbonTablePath(table.getStorePath, table.getCarbonTableIdentifier)
+    val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+    val fileType = FileFactory.getFileType(metadataDirectoryPath)
+    if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+      FileFactory.mkdirs(metadataDirectoryPath, fileType)
+    }
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index 316881d..f46282d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -195,6 +195,7 @@ class CarbonDataSourceSuite extends Spark2QueryTest with BeforeAndAfterAll {
   }
 
   test("test create table with complex datatype") {
+    sql("drop table if exists create_source")
     sql("create table create_source(intField int, stringField string, complexField array<string>) USING org.apache.spark.sql.CarbonSource OPTIONS('tableName'='create_source')")
     sql("drop table create_source")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a8fac30/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 8cdcd26..1788ccb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -256,19 +256,18 @@ public class CarbonCompactionUtil {
   /**
    * This will check if any compaction request has been received for any table.
    *
-   * @param tableMetas
+   * @param carbonTables
    * @return
    */
-  public static TableMeta getNextTableToCompact(TableMeta[] tableMetas,
+  public static CarbonTable getNextTableToCompact(CarbonTable[] carbonTables,
       List<CarbonTableIdentifier> skipList) {
-    for (TableMeta table : tableMetas) {
-      CarbonTable ctable = table.carbonTable;
+    for (CarbonTable ctable : carbonTables) {
       String metadataPath = ctable.getMetaDataFilepath();
       // check for the compaction required file and at the same time exclude the tables which are
       // present in the skip list.
       if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
-          .contains(table.carbonTableIdentifier)) {
-        return table;
+          .contains(ctable.getCarbonTableIdentifier())) {
+        return ctable;
       }
     }
     return null;