You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:40 UTC

[02/14] incubator-carbondata git commit: rebase

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index b7673db..9353a92 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -19,426 +19,37 @@ package org.apache.spark.sql.execution.command
 
 import java.io.File
 import java.text.SimpleDateFormat
-import java.util
-import java.util.UUID
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, Map}
 import scala.language.implicitConversions
-import scala.util.Random
 
-import org.apache.spark.SparkEnv
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
 import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
-import org.apache.spark.sql.hive.{CarbonHiveMetadataUtil, HiveContext}
+import org.apache.spark.sql.hive.CarbonMetastore
 import org.apache.spark.sql.types.TimestampType
 import org.apache.spark.util.FileUtils
 import org.codehaus.jackson.map.ObjectMapper
 
-import org.apache.carbondata.common.factory.CarbonCommonFactory
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
-import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
-import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension,
-ColumnSchema}
+import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.load.LoadMetadataDetails
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.integration.spark.merger.CompactionType
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil,
-GlobalDictionaryUtil}
-
-case class tableModel(
-    ifNotExistsSet: Boolean,
-    var databaseName: String,
-    databaseNameOp: Option[String],
-    tableName: String,
-    tableProperties: Map[String, String],
-    dimCols: Seq[Field],
-    msrCols: Seq[Field],
-    highcardinalitydims: Option[Seq[String]],
-    noInvertedIdxCols: Option[Seq[String]],
-    partitioner: Option[Partitioner],
-    columnGroups: Seq[String],
-    colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None)
-
-case class Field(column: String, var dataType: Option[String], name: Option[String],
-    children: Option[List[Field]], parent: String = null,
-    storeType: Option[String] = Some("columnar"),
-    var precision: Int = 0, var scale: Int = 0)
-
-case class ColumnProperty(key: String, value: String)
-
-case class ComplexField(complexType: String, primitiveField: Option[Field],
-    complexField: Option[ComplexField])
-
-case class Partitioner(partitionClass: String, partitionColumn: Array[String], partitionCount: Int,
-    nodeList: Array[String])
-
-case class PartitionerField(partitionColumn: String, dataType: Option[String],
-    columnComment: String)
-
-case class DataLoadTableFileMapping(table: String, loadPath: String)
-
-case class CarbonMergerMapping(storeLocation: String,
-    storePath: String,
-    metadataFilePath: String,
-    mergedLoadName: String,
-    kettleHomePath: String,
-    tableCreationTime: Long,
-    databaseName: String,
-    factTableName: String,
-    validSegments: Array[String],
-    tableId: String,
-    // maxSegmentColCardinality is Cardinality of last segment of compaction
-    var maxSegmentColCardinality: Array[Int],
-    // maxSegmentColumnSchemaList is list of column schema of last segment of compaction
-    var maxSegmentColumnSchemaList: List[ColumnSchema])
-
-case class NodeInfo(TaskId: String, noOfBlocks: Int)
-
-case class AlterTableModel(dbName: Option[String], tableName: String,
-    compactionType: String, alterSql: String)
-
-case class CompactionModel(compactionSize: Long,
-    compactionType: CompactionType,
-    carbonTable: CarbonTable,
-    tableCreationTime: Long,
-    isDDLTrigger: Boolean)
-
-case class CompactionCallableModel(storePath: String,
-    carbonLoadModel: CarbonLoadModel,
-    storeLocation: String,
-    carbonTable: CarbonTable,
-    kettleHomePath: String,
-    cubeCreationTime: Long,
-    loadsToMerge: util.List[LoadMetadataDetails],
-    sqlContext: SQLContext,
-    compactionType: CompactionType)
-
-object TableNewProcessor {
-  def apply(cm: tableModel, sqlContext: SQLContext): TableInfo = {
-    new TableNewProcessor(cm, sqlContext).process
-  }
-}
-
-class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
-
-  var index = 0
-  var rowGroup = 0
-
-  def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
-    var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
-    fieldChildren.foreach(fields => {
-      fields.foreach(field => {
-        val encoders = new java.util.ArrayList[Encoding]()
-        encoders.add(Encoding.DICTIONARY)
-        val columnSchema: ColumnSchema = getColumnSchema(
-          DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
-          field.name.getOrElse(field.column), index,
-          isCol = true, encoders, isDimensionCol = true, rowGroup, field.precision, field.scale)
-        allColumns ++= Seq(columnSchema)
-        index = index + 1
-        rowGroup = rowGroup + 1
-        if (field.children.get != null) {
-          columnSchema.setNumberOfChild(field.children.get.size)
-          allColumns ++= getAllChildren(field.children)
-        }
-      })
-    })
-    allColumns
-  }
-
-  def getColumnSchema(dataType: DataType, colName: String, index: Integer, isCol: Boolean,
-      encoders: java.util.List[Encoding], isDimensionCol: Boolean,
-      colGroup: Integer, precision: Integer, scale: Integer): ColumnSchema = {
-    val columnSchema = new ColumnSchema()
-    columnSchema.setDataType(dataType)
-    columnSchema.setColumnName(colName)
-    val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
-    if (highCardinalityDims.contains(colName)) {
-      encoders.remove(encoders.remove(Encoding.DICTIONARY))
-    }
-    if (dataType == DataType.TIMESTAMP) {
-      encoders.add(Encoding.DIRECT_DICTIONARY)
-    }
-    val colPropMap = new java.util.HashMap[String, String]()
-    if (cm.colProps.isDefined && null != cm.colProps.get.get(colName)) {
-      val colProps = cm.colProps.get.get(colName)
-      colProps.asScala.foreach { x => colPropMap.put(x.key, x.value) }
-    }
-    columnSchema.setColumnProperties(colPropMap)
-    columnSchema.setEncodingList(encoders)
-    val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
-    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.databaseName,
-      columnSchema)
-    columnSchema.setColumnUniqueId(columnUniqueId)
-    columnSchema.setColumnReferenceId(columnUniqueId)
-    columnSchema.setColumnar(isCol)
-    columnSchema.setDimensionColumn(isDimensionCol)
-    columnSchema.setColumnGroup(colGroup)
-    columnSchema.setPrecision(precision)
-    columnSchema.setScale(scale)
-    // TODO: Need to fill RowGroupID, converted type
-    // & Number of Children after DDL finalization
-    columnSchema
-  }
-
-  // process create dml fields and create wrapper TableInfo object
-  def process: TableInfo = {
-    val LOGGER = LogServiceFactory.getLogService(TableNewProcessor.getClass.getName)
-    var allColumns = Seq[ColumnSchema]()
-    var index = 0
-    cm.dimCols.foreach(field => {
-      val encoders = new java.util.ArrayList[Encoding]()
-      encoders.add(Encoding.DICTIONARY)
-      val columnSchema: ColumnSchema = getColumnSchema(
-        DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
-        field.name.getOrElse(field.column),
-        index,
-        isCol = true,
-        encoders,
-        isDimensionCol = true,
-        -1,
-        field.precision,
-        field.scale)
-      allColumns ++= Seq(columnSchema)
-      index = index + 1
-      if (field.children.isDefined && field.children.get != null) {
-        columnSchema.setNumberOfChild(field.children.get.size)
-        allColumns ++= getAllChildren(field.children)
-      }
-    })
-
-    cm.msrCols.foreach(field => {
-      val encoders = new java.util.ArrayList[Encoding]()
-      val columnSchema: ColumnSchema = getColumnSchema(
-        DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
-        field.name.getOrElse(field.column),
-        index,
-        isCol = true,
-        encoders,
-        isDimensionCol = false,
-        -1,
-        field.precision,
-        field.scale)
-      val measureCol = columnSchema
-
-      allColumns ++= Seq(measureCol)
-      index = index + 1
-    })
-
-    // Check if there is any duplicate measures or dimensions.
-    // Its based on the dimension name and measure name
-    allColumns.groupBy(_.getColumnName).foreach(f => if (f._2.size > 1) {
-      val name = f._1
-      LOGGER.error(s"Duplicate column found with name: $name")
-      LOGGER.audit(
-        s"Validation failed for Create/Alter Table Operation " +
-        s"for ${ cm.databaseName }.${ cm.tableName }" +
-        s"Duplicate column found with name: $name")
-      sys.error(s"Duplicate dimensions found with name: $name")
-    })
-
-    val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
-
-    checkColGroupsValidity(cm.columnGroups, allColumns, highCardinalityDims)
-
-    updateColumnGroupsInFields(cm.columnGroups, allColumns)
-
-    var newOrderedDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
-    val complexDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
-    val measures = scala.collection.mutable.ListBuffer[ColumnSchema]()
-    for (column <- allColumns) {
-      if (highCardinalityDims.contains(column.getColumnName)) {
-        newOrderedDims += column
-      } else if (column.isComplex) {
-        complexDims += column
-      } else if (column.isDimensionColumn) {
-        newOrderedDims += column
-      } else {
-        measures += column
-      }
-
-    }
-
-    // Setting the boolean value of useInvertedIndex in column schema
-    val noInvertedIndexCols = cm.noInvertedIdxCols.getOrElse(Seq())
-    for (column <- allColumns) {
-      // When the column is measure or the specified no inverted index column in DDL,
-      // set useInvertedIndex to false, otherwise true.
-      if (noInvertedIndexCols.contains(column.getColumnName) ||
-          cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) {
-        column.setUseInvertedIndex(false)
-      } else {
-        column.setUseInvertedIndex(true)
-      }
-    }
-
-    // Adding dummy measure if no measure is provided
-    if (measures.size < 1) {
-      val encoders = new java.util.ArrayList[Encoding]()
-      val columnSchema: ColumnSchema = getColumnSchema(DataType.DOUBLE,
-        CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
-        index,
-        true,
-        encoders,
-        false,
-        -1, 0, 0)
-      columnSchema.setInvisible(true)
-      val measureColumn = columnSchema
-      measures += measureColumn
-      allColumns = allColumns ++ measures
-    }
-    val columnValidator = CarbonSparkFactory.getCarbonColumnValidator()
-    columnValidator.validateColumns(allColumns)
-    newOrderedDims = newOrderedDims ++ complexDims ++ measures
-
-    cm.partitioner match {
-      case Some(part: Partitioner) =>
-        var definedpartCols = part.partitionColumn
-        val columnBuffer = new ArrayBuffer[String]
-        part.partitionColumn.foreach { col =>
-          newOrderedDims.foreach { dim =>
-            if (dim.getColumnName.equalsIgnoreCase(col)) {
-              definedpartCols = definedpartCols.dropWhile { c => c.equals(col) }
-              columnBuffer += col
-            }
-          }
-        }
-
-        // Special Case, where Partition count alone is sent to Carbon for dataloading
-        if (part.partitionClass.isEmpty) {
-          if (part.partitionColumn(0).isEmpty) {
-            Partitioner(
-              "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
-              Array(""), part.partitionCount, null)
-          } else {
-            // case where partition cols are set and partition class is not set.
-            // so setting the default value.
-            Partitioner(
-              "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
-              part.partitionColumn, part.partitionCount, null)
-          }
-        } else if (definedpartCols.nonEmpty) {
-          val msg = definedpartCols.mkString(", ")
-          LOGGER.error(s"partition columns specified are not part of Dimension columns: $msg")
-          LOGGER.audit(
-            s"Validation failed for Create/Alter Table Operation for " +
-            s"${ cm.databaseName }.${ cm.tableName } " +
-            s"partition columns specified are not part of Dimension columns: $msg")
-          sys.error(s"partition columns specified are not part of Dimension columns: $msg")
-        } else {
-
-          try {
-            Class.forName(part.partitionClass).newInstance()
-          } catch {
-            case e: Exception =>
-              val cl = part.partitionClass
-              LOGGER.audit(
-                s"Validation failed for Create/Alter Table Operation for " +
-                s"${ cm.databaseName }.${ cm.tableName } " +
-                s"partition class specified can not be found or loaded: $cl")
-              sys.error(s"partition class specified can not be found or loaded: $cl")
-          }
-
-          Partitioner(part.partitionClass, columnBuffer.toArray, part.partitionCount, null)
-        }
-      case None =>
-        Partitioner("org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
-          Array(""), 20, null)
-    }
-    val tableInfo = new TableInfo()
-    val tableSchema = new TableSchema()
-    val schemaEvol = new SchemaEvolution()
-    schemaEvol
-      .setSchemaEvolutionEntryList(new util.ArrayList[SchemaEvolutionEntry]())
-    tableSchema.setTableId(UUID.randomUUID().toString)
-    // populate table properties map
-    val tablePropertiesMap = new java.util.HashMap[String, String]()
-    cm.tableProperties.foreach {
-      x => tablePropertiesMap.put(x._1, x._2)
-    }
-    tableSchema.setTableProperties(tablePropertiesMap)
-    tableSchema.setTableName(cm.tableName)
-    tableSchema.setListOfColumns(allColumns.asJava)
-    tableSchema.setSchemaEvalution(schemaEvol)
-    tableInfo.setDatabaseName(cm.databaseName)
-    tableInfo.setTableUniqueName(cm.databaseName + "_" + cm.tableName)
-    tableInfo.setLastUpdatedTime(System.currentTimeMillis())
-    tableInfo.setFactTable(tableSchema)
-    tableInfo.setAggregateTableList(new util.ArrayList[TableSchema]())
-    tableInfo
-  }
-
-  //  For checking if the specified col group columns are specified in fields list.
-  protected def checkColGroupsValidity(colGrps: Seq[String],
-      allCols: Seq[ColumnSchema],
-      highCardCols: Seq[String]): Unit = {
-    if (null != colGrps) {
-      colGrps.foreach(columngroup => {
-        val rowCols = columngroup.split(",")
-        rowCols.foreach(colForGrouping => {
-          var found: Boolean = false
-          // check for dimensions + measures
-          allCols.foreach(eachCol => {
-            if (eachCol.getColumnName.equalsIgnoreCase(colForGrouping.trim())) {
-              found = true
-            }
-          })
-          // check for No Dicitonary dimensions
-          highCardCols.foreach(noDicCol => {
-            if (colForGrouping.trim.equalsIgnoreCase(noDicCol)) {
-              found = true
-            }
-          })
-
-          if (!found) {
-            sys.error(s"column $colForGrouping is not present in Field list")
-          }
-        })
-      })
-    }
-  }
-
-  // For updating the col group details for fields.
-  private def updateColumnGroupsInFields(colGrps: Seq[String], allCols: Seq[ColumnSchema]): Unit = {
-    if (null != colGrps) {
-      var colGroupId = -1
-      colGrps.foreach(columngroup => {
-        colGroupId += 1
-        val rowCols = columngroup.split(",")
-        rowCols.foreach(row => {
-
-          allCols.foreach(eachCol => {
-
-            if (eachCol.getColumnName.equalsIgnoreCase(row.trim)) {
-              eachCol.setColumnGroup(colGroupId)
-              eachCol.setColumnar(false)
-            }
-          })
-        })
-      })
-    }
-  }
-}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
 
 /**
  * Command for the compaction in alter table command
@@ -459,7 +70,7 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
     }
 
     val relation =
-      CarbonEnv.getInstance(sqlContext).carbonCatalog
+      CarbonEnv.get.carbonMetastore
         .lookupRelation1(Option(databaseName), tableName)(sqlContext)
         .asInstanceOf[CarbonRelation]
     if (relation == null) {
@@ -478,7 +89,6 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
     carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
     carbonLoadModel.setStorePath(relation.tableMeta.storePath)
 
-    val partitioner = relation.tableMeta.partitioner
     val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
 
     var storeLocation = CarbonProperties.getInstance
@@ -507,7 +117,7 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
   }
 }
 
-case class CreateTable(cm: tableModel) extends RunnableCommand {
+case class CreateTable(cm: TableModel) extends RunnableCommand {
 
   def run(sqlContext: SQLContext): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -531,7 +141,7 @@ case class CreateTable(cm: tableModel) extends RunnableCommand {
       }
     } else {
       // Add Database to catalog and persist
-      val catalog = CarbonEnv.getInstance(sqlContext).carbonCatalog
+      val catalog = CarbonEnv.get.carbonMetastore
       // Need to fill partitioner class when we support partition
       val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName, null)(sqlContext)
       try {
@@ -544,7 +154,7 @@ case class CreateTable(cm: tableModel) extends RunnableCommand {
           val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
           // call the drop table to delete the created table.
 
-          CarbonEnv.getInstance(sqlContext).carbonCatalog
+          CarbonEnv.get.carbonMetastore
             .dropTable(catalog.storePath, identifier)(sqlContext)
 
           LOGGER.audit(s"Table creation with Database name [$dbName] " +
@@ -581,7 +191,7 @@ private[sql] case class DeleteLoadsById(
     val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
 
     val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
+    val relation = CarbonEnv.get.carbonMetastore.lookupRelation1(
       identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
     if (relation == null) {
       LOGGER.audit(s"Delete segment by Id is failed. Table $dbName.$tableName does not exist")
@@ -591,7 +201,7 @@ private[sql] case class DeleteLoadsById(
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
 
     if (null == carbonTable) {
-      CarbonEnv.getInstance(sqlContext).carbonCatalog
+      CarbonEnv.get.carbonMetastore
         .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
     }
     val path = carbonTable.getMetaDataFilepath
@@ -640,7 +250,7 @@ private[sql] case class DeleteLoadsByLoadDate(
     LOGGER.audit("The delete segment by load date request has been received.")
     val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
     val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+    val relation = CarbonEnv.get.carbonMetastore
       .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
     if (relation == null) {
       LOGGER
@@ -658,7 +268,7 @@ private[sql] case class DeleteLoadsByLoadDate(
     val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
       .getCarbonTable(dbName + '_' + tableName)
     if (null == carbonTable) {
-      var relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+      var relation = CarbonEnv.get.carbonMetastore
         .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
     }
     val path = carbonTable.getMetaDataFilepath()
@@ -683,6 +293,44 @@ private[sql] case class DeleteLoadsByLoadDate(
 
 }
 
+object LoadTable {
+
+  def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
+      sqlContext: SQLContext,
+      model: DictionaryLoadModel,
+      noDictDimension: Array[CarbonDimension]): Unit = {
+
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
+      model.table)
+    val schemaFilePath = carbonTablePath.getSchemaFilePath
+
+    // read TableInfo
+    val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
+
+    // modify TableInfo
+    val columns = tableInfo.getFact_table.getTable_columns
+    for (i <- 0 until columns.size) {
+      if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
+        columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
+      }
+    }
+
+    // write TableInfo
+    CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
+
+    // update Metadata
+    val catalog = CarbonEnv.get.carbonMetastore
+    catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
+      model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
+
+    // update CarbonDataLoadSchema
+    val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
+      model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+  }
+
+}
+
 case class LoadTable(
     databaseNameOp: Option[String],
     tableName: String,
@@ -699,7 +347,6 @@ case class LoadTable(
   def run(sqlContext: SQLContext): Seq[Row] = {
 
     val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
-    val identifier = TableIdentifier(tableName, Option(dbName))
     if (isOverwriteExist) {
       sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
     }
@@ -710,7 +357,7 @@ case class LoadTable(
       sys.error(s"Data loading failed. table not found: $dbName.$tableName")
     }
 
-    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+    val relation = CarbonEnv.get.carbonMetastore
       .lookupRelation1(Option(dbName), tableName)(sqlContext)
       .asInstanceOf[CarbonRelation]
     if (relation == null) {
@@ -725,15 +372,13 @@ case class LoadTable(
     try {
       if (carbonLock.lockWithRetries()) {
         logInfo("Successfully able to get the table metadata file lock")
-      }
-      else {
+      } else {
         sys.error("Table is locked for updation. Please try after some time")
       }
 
       val factPath = if (dataFrame.isDefined) {
         ""
-      }
-      else {
+      } else {
         FileUtils.getPaths(
           CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
       }
@@ -743,8 +388,7 @@ case class LoadTable(
       carbonLoadModel.setStorePath(relation.tableMeta.storePath)
       if (dimFilesPath.isEmpty) {
         carbonLoadModel.setDimFolderPath(null)
-      }
-      else {
+      } else {
         val x = dimFilesPath.map(f => f.table + ":" + CarbonUtil.checkAndAppendHDFSUrl(f.loadPath))
         carbonLoadModel.setDimFolderPath(x.mkString(","))
       }
@@ -755,9 +399,8 @@ case class LoadTable(
       val dataLoadSchema = new CarbonDataLoadSchema(table)
       // Need to fill dimension relation
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-      val configuredStore = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
 
-      var partitionLocation = relation.tableMeta.storePath + "/partition/" +
+      val partitionLocation = relation.tableMeta.storePath + "/partition/" +
                               relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
                               relation.tableMeta.carbonTableIdentifier.getTableName + "/"
 
@@ -830,18 +473,16 @@ case class LoadTable(
       // set local dictionary path, and dictionary file extension
       carbonLoadModel.setAllDictPath(allDictionaryPath)
 
-      var partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+      val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
       try {
         // First system has to partition the data first and then call the load data
-        if (null == relation.tableMeta.partitioner.partitionColumn ||
-            relation.tableMeta.partitioner.partitionColumn(0).isEmpty) {
-          LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
-          carbonLoadModel.setFactFilePath(factPath)
-          carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter))
-          carbonLoadModel.setCsvHeader(fileHeader)
-          carbonLoadModel.setColDictFilePath(columnDict)
-          carbonLoadModel.setDirectLoad(true)
-        }
+        LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+        carbonLoadModel.setFactFilePath(factPath)
+        carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter))
+        carbonLoadModel.setCsvHeader(fileHeader)
+        carbonLoadModel.setColDictFilePath(columnDict)
+        carbonLoadModel.setDirectLoad(true)
+        GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata
         GlobalDictionaryUtil
           .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath,
             dataFrame)
@@ -849,7 +490,6 @@ case class LoadTable(
             carbonLoadModel,
             relation.tableMeta.storePath,
             kettleHomePath,
-            relation.tableMeta.partitioner,
             columinar,
             partitionStatus,
             useKettle,
@@ -934,7 +574,7 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
     val carbonLock = CarbonLockFactory
       .getCarbonLockObj(carbonTableIdentifier, LockUsage.DROP_TABLE_LOCK)
-    val storePath = CarbonEnv.getInstance(sqlContext).carbonCatalog.storePath
+    val storePath = CarbonEnv.get.carbonMetastore.storePath
     var isLocked = false
     try {
       isLocked = carbonLock.lockWithRetries()
@@ -946,7 +586,7 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O
         sys.error("Table is locked for deletion. Please try after some time")
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
-      CarbonEnv.getInstance(sqlContext).carbonCatalog.dropTable(storePath, identifier)(sqlContext)
+      CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sqlContext)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
     } finally {
       if (carbonLock != null && isLocked) {
@@ -988,7 +628,7 @@ private[sql] case class ShowLoads(
     val tableUniqueName = databaseName + "_" + tableName
     // Here using checkSchemasModifiedTimeAndReloadTables in tableExists to reload metadata if
     // schema is changed by other process, so that tableInfoMap woulb be refilled.
-    val tableExists = CarbonEnv.getInstance(sqlContext).carbonCatalog
+    val tableExists = CarbonEnv.get.carbonMetastore
       .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
     if (!tableExists) {
       sys.error(s"$databaseName.$tableName is not found")
@@ -1045,7 +685,7 @@ private[sql] case class DescribeCommandFormatted(
   extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+    val relation = CarbonEnv.get.carbonMetastore
       .lookupRelation1(tblIdentifier)(sqlContext).asInstanceOf[CarbonRelation]
     val mapper = new ObjectMapper()
     val colProps = StringBuilder.newBuilder
@@ -1130,7 +770,7 @@ private[sql] case class DeleteLoadByDate(
     val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
     LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
     val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+    val relation = CarbonEnv.get.carbonMetastore
       .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
     var level: String = ""
     val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata
@@ -1156,7 +796,7 @@ private[sql] case class DeleteLoadByDate(
       new CarbonDataLoadSchema(carbonTable),
       dbName,
       tableName,
-      CarbonEnv.getInstance(sqlContext).carbonCatalog.storePath,
+      CarbonEnv.get.carbonMetastore.storePath,
       level,
       actualColName,
       dateValue)
@@ -1176,7 +816,7 @@ private[sql] case class CleanFiles(
     val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
     LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
     val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+    val relation = CarbonEnv.get.carbonMetastore
       .lookupRelation1(identifier)(sqlContext).
       asInstanceOf[CarbonRelation]
     if (relation == null) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
new file mode 100644
index 0000000..bee891c
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io._
+import java.util.{GregorianCalendar, UUID}
+
+import scala.Array.canBuildFrom
+import scala.collection.mutable.ArrayBuffer
+import scala.language.implicitConversions
+import scala.util.parsing.combinator.RegexParsers
+
+import org.apache.spark
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.sql.hive.client.ClientInterface
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType
+import org.apache.carbondata.core.reader.ThriftReader
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.core.writer.ThriftWriter
+import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.lcm.locks.ZookeeperInit
+import org.apache.carbondata.spark.merger.TableMeta
+
+case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])
+
+case class CarbonMetaData(dims: Seq[String],
+    msrs: Seq[String],
+    carbonTable: CarbonTable,
+    dictionaryMap: DictionaryMap)
+
+object CarbonMetastore {
+
+  def readSchemaFileToThriftTable(schemaFilePath: String): TableInfo = {
+    val createTBase = new ThriftReader.TBaseCreator() {
+      override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
+        new TableInfo()
+      }
+    }
+    val thriftReader = new ThriftReader(schemaFilePath, createTBase)
+    var tableInfo: TableInfo = null
+    try {
+      thriftReader.open()
+      tableInfo = thriftReader.read().asInstanceOf[TableInfo]
+    } finally {
+      thriftReader.close()
+    }
+    tableInfo
+  }
+
+  def writeThriftTableToSchemaFile(schemaFilePath: String, tableInfo: TableInfo): Unit = {
+    val thriftWriter = new ThriftWriter(schemaFilePath, false)
+    try {
+      thriftWriter.open()
+      thriftWriter.write(tableInfo);
+    } finally {
+      thriftWriter.close()
+    }
+  }
+
+}
+
+case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
+  def get(name: String): Option[Boolean] = {
+    dictionaryMap.get(name.toLowerCase)
+  }
+}
+
+class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
+    client: ClientInterface, queryId: String) extends HiveMetastoreCatalog(client, hiveContext) {
+
+  @transient
+  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
+
+  val tableModifiedTimeStore = new java.util.HashMap[String, Long]()
+  tableModifiedTimeStore
+    .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis())
+
+  val metadata = loadMetadata(storePath)
+
+  def getTableCreationTime(databaseName: String, tableName: String): Long = {
+    val tableMeta = metadata.tablesMeta.filter(
+      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(databaseName) &&
+           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+    val tableCreationTime = tableMeta.head.carbonTable.getTableLastUpdatedTime
+    tableCreationTime
+  }
+
+  def lookupRelation1(dbName: Option[String],
+      tableName: String)(sqlContext: SQLContext): LogicalPlan = {
+    lookupRelation1(TableIdentifier(tableName, dbName))(sqlContext)
+  }
+
+  def lookupRelation1(tableIdentifier: TableIdentifier,
+      alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = {
+    checkSchemasModifiedTimeAndReloadTables()
+    val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
+    val tables = getTableFromMetadata(database, tableIdentifier.table)
+    tables match {
+      case Some(t) =>
+        CarbonRelation(database, tableIdentifier.table,
+          CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias)(sqlContext)
+      case None =>
+        LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}")
+        throw new NoSuchTableException
+    }
+  }
+
+  /**
+   * This method will search for a table in the catalog metadata
+   *
+   * @param database
+   * @param tableName
+   * @return
+   */
+  def getTableFromMetadata(database: String,
+      tableName: String): Option[TableMeta] = {
+    metadata.tablesMeta
+      .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+                 c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+  }
+
+  def tableExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
+    checkSchemasModifiedTimeAndReloadTables()
+    val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
+    val tables = metadata.tablesMeta.filter(
+      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+    tables.nonEmpty
+  }
+
+  def loadMetadata(metadataPath: String): MetaData = {
+    val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
+    val statistic = new QueryStatistic()
+    // creating zookeeper instance once.
+    // if zookeeper is configured as carbon lock type.
+    val zookeeperUrl: String = hiveContext.getConf(CarbonCommonConstants.ZOOKEEPER_URL, null)
+    if (zookeeperUrl != null) {
+      CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperUrl)
+      ZookeeperInit.getInstance(zookeeperUrl)
+      LOGGER.info("Zookeeper url is configured. Taking the zookeeper as lock type.")
+      var configuredLockType = CarbonProperties.getInstance
+        .getProperty(CarbonCommonConstants.LOCK_TYPE)
+      if (null == configuredLockType) {
+        configuredLockType = CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
+        CarbonProperties.getInstance
+          .addProperty(CarbonCommonConstants.LOCK_TYPE,
+            configuredLockType)
+      }
+    }
+
+    if (metadataPath == null) {
+      return null
+    }
+    val fileType = FileFactory.getFileType(metadataPath)
+    val metaDataBuffer = new ArrayBuffer[TableMeta]
+    fillMetaData(metadataPath, fileType, metaDataBuffer)
+    updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
+    statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
+      System.currentTimeMillis())
+    recorder.recordStatisticsForDriver(statistic, queryId)
+    MetaData(metaDataBuffer)
+  }
+
+  private def fillMetaData(basePath: String, fileType: FileType,
+      metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
+    val databasePath = basePath // + "/schemas"
+    try {
+      if (FileFactory.isFileExist(databasePath, fileType)) {
+        val file = FileFactory.getCarbonFile(databasePath, fileType)
+        val databaseFolders = file.listFiles()
+
+        databaseFolders.foreach(databaseFolder => {
+          if (databaseFolder.isDirectory) {
+            val dbName = databaseFolder.getName
+            val tableFolders = databaseFolder.listFiles()
+
+            tableFolders.foreach(tableFolder => {
+              if (tableFolder.isDirectory) {
+                val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
+                  tableFolder.getName, UUID.randomUUID().toString)
+                val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
+                  carbonTableIdentifier)
+                val tableMetadataFile = carbonTablePath.getSchemaFilePath
+
+                if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+                  val tableName = tableFolder.getName
+                  val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
+
+
+                  val createTBase = new ThriftReader.TBaseCreator() {
+                    override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
+                      new TableInfo()
+                    }
+                  }
+                  val thriftReader = new ThriftReader(tableMetadataFile, createTBase)
+                  thriftReader.open()
+                  val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo]
+                  thriftReader.close()
+
+                  val schemaConverter = new ThriftWrapperSchemaConverterImpl
+                  val wrapperTableInfo = schemaConverter
+                    .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
+                  val schemaFilePath = CarbonStorePath
+                    .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+                  wrapperTableInfo.setStorePath(storePath)
+                  wrapperTableInfo
+                    .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+                  CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+                  val carbonTable =
+                    org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
+                      .getCarbonTable(tableUniqueName)
+                  metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
+                    carbonTable)
+                }
+              }
+            })
+          }
+        })
+      } else {
+        // Create folders and files.
+        FileFactory.mkdirs(databasePath, fileType)
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        FileFactory.mkdirs(databasePath, fileType)
+    }
+  }
+
+  /**
+   *
+   * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
+   * Load CarbonTable from wrapper tableinfo
+   *
+   */
+  def createTableFromThrift(
+      tableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo,
+      dbName: String, tableName: String, partitioner: Partitioner)
+    (sqlContext: SQLContext): String = {
+    if (tableExists(TableIdentifier(tableName, Some(dbName)))(sqlContext)) {
+      sys.error(s"Table [$tableName] already exists under Database [$dbName]")
+    }
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val thriftTableInfo = schemaConverter
+      .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
+    val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
+    thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
+      .add(schemaEvolutionEntry)
+
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
+      tableInfo.getFactTable.getTableId)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+    val schemaFilePath = carbonTablePath.getSchemaFilePath
+    val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
+    tableInfo.setMetaDataFilepath(schemaMetadataPath)
+    tableInfo.setStorePath(storePath)
+    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
+      CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName))
+
+    val fileType = FileFactory.getFileType(schemaMetadataPath)
+    if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
+      FileFactory.mkdirs(schemaMetadataPath, fileType)
+    }
+    val thriftWriter = new ThriftWriter(schemaFilePath, false)
+    thriftWriter.open()
+    thriftWriter.write(thriftTableInfo)
+    thriftWriter.close()
+    metadata.tablesMeta += tableMeta
+    logInfo(s"Table $tableName for Database $dbName created successfully.")
+    LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+    carbonTablePath.getPath
+  }
+
+  private def updateMetadataByWrapperTable(
+      wrapperTableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo): Unit = {
+
+    CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      wrapperTableInfo.getTableUniqueName)
+    for (i <- metadata.tablesMeta.indices) {
+      if (wrapperTableInfo.getTableUniqueName.equals(
+        metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
+        metadata.tablesMeta(i).carbonTable = carbonTable
+      }
+    }
+  }
+
+  def updateMetadataByThriftTable(schemaFilePath: String,
+      tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = {
+
+    tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+      .setTime_stamp(System.currentTimeMillis())
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val wrapperTableInfo = schemaConverter
+      .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+    wrapperTableInfo
+      .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+    wrapperTableInfo.setStorePath(storePath)
+    updateMetadataByWrapperTable(wrapperTableInfo)
+  }
+
+  /**
+   * Shows all tables for given schema.
+   */
+  def getTables(databaseName: Option[String])(sqlContext: SQLContext): Seq[(String, Boolean)] = {
+
+    val dbName =
+      databaseName.getOrElse(sqlContext.asInstanceOf[HiveContext].catalog.client.currentDatabase)
+    checkSchemasModifiedTimeAndReloadTables()
+    metadata.tablesMeta.filter { c =>
+      c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName)
+    }.map { c => (c.carbonTableIdentifier.getTableName, false) }
+  }
+
+  def isTablePathExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
+    val dbName = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
+    val tableName = tableIdentifier.table
+
+    val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
+      new CarbonTableIdentifier(dbName, tableName, "")).getPath
+
+    val fileType = FileFactory.getFileType(tablePath)
+    FileFactory.isFileExist(tablePath, fileType)
+  }
+
+  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+    (sqlContext: SQLContext) {
+    val dbName = tableIdentifier.database.get
+    val tableName = tableIdentifier.table
+
+    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
+      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+
+    val fileType = FileFactory.getFileType(metadataFilePath)
+
+    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+      // while drop we should refresh the schema modified time so that if any thing has changed
+      // in the other beeline need to update.
+      checkSchemasModifiedTimeAndReloadTables
+      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+      val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
+        tableIdentifier.table)
+      metadataToBeRemoved match {
+        case Some(tableMeta) =>
+          metadata.tablesMeta -= tableMeta
+          org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
+            .removeTable(dbName + "_" + tableName)
+          org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
+            .removeTable(dbName + "_" + tableName)
+          updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+        case None =>
+          logInfo(s"Metadata does not contain entry for table $tableName in database $dbName")
+      }
+      CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sqlContext)
+      // discard cached table info in cachedDataSourceTables
+      sqlContext.catalog.refreshTable(tableIdentifier)
+    }
+  }
+
+  private def getTimestampFileAndType(databaseName: String, tableName: String) = {
+    val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+    val timestampFileType = FileFactory.getFileType(timestampFile)
+    (timestampFile, timestampFileType)
+  }
+
+  /**
+   * This method will put the updated timestamp of schema file in the table modified time store map
+   *
+   * @param timeStamp
+   */
+  def updateSchemasUpdatedTime(timeStamp: Long) {
+    tableModifiedTimeStore.put("default", timeStamp)
+  }
+
+  /**
+   * This method will read the timestamp of empty schema file
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+      FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
+    } else {
+      System.currentTimeMillis()
+    }
+  }
+
+  /**
+   * This method will check and create an empty schema timestamp file
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+    if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
+      LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
+      FileFactory.createNewFile(timestampFile, timestampFileType)
+    }
+    val systemTime = System.currentTimeMillis()
+    FileFactory.getCarbonFile(timestampFile, timestampFileType)
+      .setLastModifiedTime(systemTime)
+    systemTime
+  }
+
+  def checkSchemasModifiedTimeAndReloadTables() {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
+    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+      if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
+        getLastModifiedTime ==
+            tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
+        refreshCache()
+      }
+    }
+  }
+
+  def refreshCache() {
+    metadata.tablesMeta = loadMetadata(storePath).tablesMeta
+  }
+
+  def getSchemaLastUpdatedTime(databaseName: String, tableName: String): Long = {
+    var schemaLastUpdatedTime = System.currentTimeMillis
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+      schemaLastUpdatedTime = FileFactory.getCarbonFile(timestampFile, timestampFileType)
+        .getLastModifiedTime
+    }
+    schemaLastUpdatedTime
+  }
+
+  def createDatabaseDirectory(dbName: String) {
+    val databasePath = storePath + File.separator + dbName
+    val fileType = FileFactory.getFileType(databasePath)
+    FileFactory.mkdirs(databasePath, fileType)
+  }
+
+  def dropDatabaseDirectory(dbName: String) {
+    val databasePath = storePath + File.separator + dbName
+    val fileType = FileFactory.getFileType(databasePath)
+    if (FileFactory.isFileExist(databasePath, fileType)) {
+      val dbPath = FileFactory.getCarbonFile(databasePath, fileType)
+      CarbonUtil.deleteFoldersAndFiles(dbPath)
+    }
+  }
+
+}
+
+
+object CarbonMetastoreTypes extends RegexParsers {
+  protected lazy val primitiveType: Parser[DataType] =
+    "string" ^^^ StringType |
+    "float" ^^^ FloatType |
+    "int" ^^^ IntegerType |
+    "tinyint" ^^^ ShortType |
+    "short" ^^^ ShortType |
+    "double" ^^^ DoubleType |
+    "long" ^^^ LongType |
+    "binary" ^^^ BinaryType |
+    "boolean" ^^^ BooleanType |
+    fixedDecimalType |
+    "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
+    "varchar\\((\\d+)\\)".r ^^^ StringType |
+    "timestamp" ^^^ TimestampType
+
+  protected lazy val fixedDecimalType: Parser[DataType] =
+    "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
+      case precision ~ scale =>
+        DecimalType(precision.toInt, scale.toInt)
+    }
+
+  protected lazy val arrayType: Parser[DataType] =
+    "array" ~> "<" ~> dataType <~ ">" ^^ {
+      case tpe => ArrayType(tpe)
+    }
+
+  protected lazy val mapType: Parser[DataType] =
+    "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
+      case t1 ~ _ ~ t2 => MapType(t1, t2)
+    }
+
+  protected lazy val structField: Parser[StructField] =
+    "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
+      case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
+    }
+
+  protected lazy val structType: Parser[DataType] =
+    "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
+      case fields => StructType(fields)
+    }
+
+  protected lazy val dataType: Parser[DataType] =
+    arrayType |
+    mapType |
+    structType |
+    primitiveType
+
+  def toDataType(metastoreType: String): DataType = {
+    parseAll(dataType, metastoreType) match {
+      case Success(result, _) => result
+      case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType")
+    }
+  }
+
+  def toMetastoreType(dt: DataType): String = {
+    dt match {
+      case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
+      case StructType(fields) =>
+        s"struct<${
+          fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
+            .mkString(",")
+        }>"
+      case StringType => "string"
+      case FloatType => "float"
+      case IntegerType => "int"
+      case ShortType => "tinyint"
+      case DoubleType => "double"
+      case LongType => "bigint"
+      case BinaryType => "binary"
+      case BooleanType => "boolean"
+      case DecimalType() => "decimal"
+      case TimestampType => "timestamp"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
deleted file mode 100644
index d219bcb..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ /dev/null
@@ -1,576 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io._
-import java.util.{GregorianCalendar, UUID}
-
-import scala.Array.canBuildFrom
-import scala.collection.mutable.ArrayBuffer
-import scala.language.implicitConversions
-import scala.util.parsing.combinator.RegexParsers
-
-import org.apache.spark
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.Partitioner
-import org.apache.spark.sql.hive.client.ClientInterface
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
-import org.apache.carbondata.core.carbon.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType
-import org.apache.carbondata.core.reader.ThriftReader
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
-import org.apache.carbondata.core.writer.ThriftWriter
-import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.lcm.locks.ZookeeperInit
-import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
-
-case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])
-
-case class CarbonMetaData(dims: Seq[String],
-    msrs: Seq[String],
-    carbonTable: CarbonTable,
-    dictionaryMap: DictionaryMap)
-
-case class TableMeta(carbonTableIdentifier: CarbonTableIdentifier, storePath: String,
-    var carbonTable: CarbonTable, partitioner: Partitioner)
-
-object CarbonMetastoreCatalog {
-
-  def readSchemaFileToThriftTable(schemaFilePath: String): TableInfo = {
-    val createTBase = new ThriftReader.TBaseCreator() {
-      override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
-        new TableInfo()
-      }
-    }
-    val thriftReader = new ThriftReader(schemaFilePath, createTBase)
-    var tableInfo: TableInfo = null
-    try {
-      thriftReader.open()
-      tableInfo = thriftReader.read().asInstanceOf[TableInfo]
-    } finally {
-      thriftReader.close()
-    }
-    tableInfo
-  }
-
-  def writeThriftTableToSchemaFile(schemaFilePath: String, tableInfo: TableInfo): Unit = {
-    val thriftWriter = new ThriftWriter(schemaFilePath, false)
-    try {
-      thriftWriter.open()
-      thriftWriter.write(tableInfo);
-    } finally {
-      thriftWriter.close()
-    }
-  }
-
-}
-
-case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
-  def get(name: String): Option[Boolean] = {
-    dictionaryMap.get(name.toLowerCase)
-  }
-}
-
-class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
-    client: ClientInterface, queryId: String) extends HiveMetastoreCatalog(client, hiveContext) {
-
-  @transient
-  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
-
-  val tableModifiedTimeStore = new java.util.HashMap[String, Long]()
-  tableModifiedTimeStore
-    .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis())
-
-  val metadata = loadMetadata(storePath)
-
-  def getTableCreationTime(databaseName: String, tableName: String): Long = {
-    val tableMeta = metadata.tablesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(databaseName) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
-    val tableCreationTime = tableMeta.head.carbonTable.getTableLastUpdatedTime
-    tableCreationTime
-  }
-
-  def lookupRelation1(dbName: Option[String],
-      tableName: String)(sqlContext: SQLContext): LogicalPlan = {
-    lookupRelation1(TableIdentifier(tableName, dbName))(sqlContext)
-  }
-
-  def lookupRelation1(tableIdentifier: TableIdentifier,
-      alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = {
-    checkSchemasModifiedTimeAndReloadTables()
-    val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
-    val tables = getTableFromMetadata(database, tableIdentifier.table)
-    tables match {
-      case Some(t) =>
-        CarbonRelation(database, tableIdentifier.table,
-          CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias)(sqlContext)
-      case None =>
-        LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}")
-        throw new NoSuchTableException
-    }
-  }
-
-  /**
-   * This method will search for a table in the catalog metadata
-   *
-   * @param database
-   * @param tableName
-   * @return
-   */
-  def getTableFromMetadata(database: String,
-      tableName: String): Option[TableMeta] = {
-    metadata.tablesMeta
-      .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-                 c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
-  }
-
-  def tableExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
-    checkSchemasModifiedTimeAndReloadTables()
-    val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
-    val tables = metadata.tablesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
-    tables.nonEmpty
-  }
-
-  def loadMetadata(metadataPath: String): MetaData = {
-    val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
-    val statistic = new QueryStatistic()
-    // creating zookeeper instance once.
-    // if zookeeper is configured as carbon lock type.
-    val zookeeperUrl: String = hiveContext.getConf(CarbonCommonConstants.ZOOKEEPER_URL, null)
-    if (zookeeperUrl != null) {
-      CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperUrl)
-      ZookeeperInit.getInstance(zookeeperUrl)
-      LOGGER.info("Zookeeper url is configured. Taking the zookeeper as lock type.")
-      var configuredLockType = CarbonProperties.getInstance
-        .getProperty(CarbonCommonConstants.LOCK_TYPE)
-      if (null == configuredLockType) {
-        configuredLockType = CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
-        CarbonProperties.getInstance
-          .addProperty(CarbonCommonConstants.LOCK_TYPE,
-            configuredLockType)
-      }
-    }
-
-    if (metadataPath == null) {
-      return null
-    }
-    val fileType = FileFactory.getFileType(metadataPath)
-    val metaDataBuffer = new ArrayBuffer[TableMeta]
-    fillMetaData(metadataPath, fileType, metaDataBuffer)
-    updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
-    statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
-      System.currentTimeMillis())
-    recorder.recordStatisticsForDriver(statistic, queryId)
-    MetaData(metaDataBuffer)
-  }
-
-  private def fillMetaData(basePath: String, fileType: FileType,
-      metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
-    val databasePath = basePath // + "/schemas"
-    try {
-      if (FileFactory.isFileExist(databasePath, fileType)) {
-        val file = FileFactory.getCarbonFile(databasePath, fileType)
-        val databaseFolders = file.listFiles()
-
-        databaseFolders.foreach(databaseFolder => {
-          if (databaseFolder.isDirectory) {
-            val dbName = databaseFolder.getName
-            val tableFolders = databaseFolder.listFiles()
-
-            tableFolders.foreach(tableFolder => {
-              if (tableFolder.isDirectory) {
-                val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
-                  tableFolder.getName, UUID.randomUUID().toString)
-                val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
-                  carbonTableIdentifier)
-                val tableMetadataFile = carbonTablePath.getSchemaFilePath
-
-                if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-                  val tableName = tableFolder.getName
-                  val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
-
-
-                  val createTBase = new ThriftReader.TBaseCreator() {
-                    override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
-                      new TableInfo()
-                    }
-                  }
-                  val thriftReader = new ThriftReader(tableMetadataFile, createTBase)
-                  thriftReader.open()
-                  val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo]
-                  thriftReader.close()
-
-                  val schemaConverter = new ThriftWrapperSchemaConverterImpl
-                  val wrapperTableInfo = schemaConverter
-                    .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
-                  val schemaFilePath = CarbonStorePath
-                    .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
-                  wrapperTableInfo.setStorePath(storePath)
-                  wrapperTableInfo
-                    .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-                  CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-                  val carbonTable =
-                    org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
-                      .getCarbonTable(tableUniqueName)
-                  metaDataBuffer += TableMeta(
-                    carbonTable.getCarbonTableIdentifier,
-                    storePath,
-                    carbonTable,
-                    // TODO: Need to update Database thirft to hold partitioner
-                    // information and reload when required.
-                    Partitioner("org.apache.carbondata.spark.partition.api.impl." +
-                                "SampleDataPartitionerImpl",
-                      Array(""), 1, Array("")))
-                }
-              }
-            })
-          }
-        })
-      } else {
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
-      }
-    } catch {
-      case s: java.io.FileNotFoundException =>
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
-    }
-  }
-
-  /**
-   *
-   * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
-   * Load CarbonTable from wrapper tableinfo
-   *
-   */
-  def createTableFromThrift(
-      tableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo,
-      dbName: String, tableName: String, partitioner: Partitioner)
-    (sqlContext: SQLContext): String = {
-    if (tableExists(TableIdentifier(tableName, Some(dbName)))(sqlContext)) {
-      sys.error(s"Table [$tableName] already exists under Database [$dbName]")
-    }
-    val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val thriftTableInfo = schemaConverter
-      .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
-    val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
-    thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
-      .add(schemaEvolutionEntry)
-
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
-      tableInfo.getFactTable.getTableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
-    val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
-    tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setStorePath(storePath)
-    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = TableMeta(
-      carbonTableIdentifier,
-      storePath,
-      CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName),
-      Partitioner("org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
-        Array(""), 1, DistributionUtil.getNodeList(hiveContext.sparkContext)))
-
-    val fileType = FileFactory.getFileType(schemaMetadataPath)
-    if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
-      FileFactory.mkdirs(schemaMetadataPath, fileType)
-    }
-    val thriftWriter = new ThriftWriter(schemaFilePath, false)
-    thriftWriter.open()
-    thriftWriter.write(thriftTableInfo)
-    thriftWriter.close()
-    metadata.tablesMeta += tableMeta
-    logInfo(s"Table $tableName for Database $dbName created successfully.")
-    LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-    carbonTablePath.getPath
-  }
-
-  private def updateMetadataByWrapperTable(
-      wrapperTableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo): Unit = {
-
-    CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-      wrapperTableInfo.getTableUniqueName)
-    for (i <- metadata.tablesMeta.indices) {
-      if (wrapperTableInfo.getTableUniqueName.equals(
-        metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
-        metadata.tablesMeta(i).carbonTable = carbonTable
-      }
-    }
-  }
-
-  def updateMetadataByThriftTable(schemaFilePath: String,
-      tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = {
-
-    tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
-      .setTime_stamp(System.currentTimeMillis())
-    val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
-    wrapperTableInfo
-      .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-    wrapperTableInfo.setStorePath(storePath)
-    updateMetadataByWrapperTable(wrapperTableInfo)
-  }
-
-  /**
-   * Shows all tables for given schema.
-   */
-  def getTables(databaseName: Option[String])(sqlContext: SQLContext): Seq[(String, Boolean)] = {
-
-    val dbName =
-      databaseName.getOrElse(sqlContext.asInstanceOf[HiveContext].catalog.client.currentDatabase)
-    checkSchemasModifiedTimeAndReloadTables()
-    metadata.tablesMeta.filter { c =>
-      c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName)
-    }.map { c => (c.carbonTableIdentifier.getTableName, false) }
-  }
-
-  def isTablePathExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
-    val dbName = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
-    val tableName = tableIdentifier.table
-
-    val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getPath
-
-    val fileType = FileFactory.getFileType(tablePath)
-    FileFactory.isFileExist(tablePath, fileType)
-  }
-
-  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
-    (sqlContext: SQLContext) {
-    val dbName = tableIdentifier.database.get
-    val tableName = tableIdentifier.table
-
-    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
-
-    val fileType = FileFactory.getFileType(metadataFilePath)
-
-    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
-      // while drop we should refresh the schema modified time so that if any thing has changed
-      // in the other beeline need to update.
-      checkSchemasModifiedTimeAndReloadTables
-      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
-      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
-      val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
-        tableIdentifier.table)
-      metadataToBeRemoved match {
-        case Some(tableMeta) =>
-          metadata.tablesMeta -= tableMeta
-          org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
-            .removeTable(dbName + "_" + tableName)
-          org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
-            .removeTable(dbName + "_" + tableName)
-          updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-        case None =>
-          logInfo(s"Metadata does not contain entry for table $tableName in database $dbName")
-      }
-      CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sqlContext)
-      // discard cached table info in cachedDataSourceTables
-      sqlContext.catalog.refreshTable(tableIdentifier)
-    }
-  }
-
-  private def getTimestampFileAndType(databaseName: String, tableName: String) = {
-    val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
-    val timestampFileType = FileFactory.getFileType(timestampFile)
-    (timestampFile, timestampFileType)
-  }
-
-  /**
-   * This method will put the updated timestamp of schema file in the table modified time store map
-   *
-   * @param timeStamp
-   */
-  def updateSchemasUpdatedTime(timeStamp: Long) {
-    tableModifiedTimeStore.put("default", timeStamp)
-  }
-
-  /**
-   * This method will read the timestamp of empty schema file
-   *
-   * @param databaseName
-   * @param tableName
-   * @return
-   */
-  def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
-    } else {
-      System.currentTimeMillis()
-    }
-  }
-
-  /**
-   * This method will check and create an empty schema timestamp file
-   *
-   * @param databaseName
-   * @param tableName
-   * @return
-   */
-  def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
-    if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
-      FileFactory.createNewFile(timestampFile, timestampFileType)
-    }
-    val systemTime = System.currentTimeMillis()
-    FileFactory.getCarbonFile(timestampFile, timestampFileType)
-      .setLastModifiedTime(systemTime)
-    systemTime
-  }
-
-  def checkSchemasModifiedTimeAndReloadTables() {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
-        getLastModifiedTime ==
-            tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
-        refreshCache()
-      }
-    }
-  }
-
-  def refreshCache() {
-    metadata.tablesMeta = loadMetadata(storePath).tablesMeta
-  }
-
-  def getSchemaLastUpdatedTime(databaseName: String, tableName: String): Long = {
-    var schemaLastUpdatedTime = System.currentTimeMillis
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      schemaLastUpdatedTime = FileFactory.getCarbonFile(timestampFile, timestampFileType)
-        .getLastModifiedTime
-    }
-    schemaLastUpdatedTime
-  }
-
-  def createDatabaseDirectory(dbName: String) {
-    val databasePath = storePath + File.separator + dbName
-    val fileType = FileFactory.getFileType(databasePath)
-    FileFactory.mkdirs(databasePath, fileType)
-  }
-
-  def dropDatabaseDirectory(dbName: String) {
-    val databasePath = storePath + File.separator + dbName
-    val fileType = FileFactory.getFileType(databasePath)
-    if (FileFactory.isFileExist(databasePath, fileType)) {
-      val dbPath = FileFactory.getCarbonFile(databasePath, fileType)
-      CarbonUtil.deleteFoldersAndFiles(dbPath)
-    }
-  }
-
-}
-
-
-object CarbonMetastoreTypes extends RegexParsers {
-  protected lazy val primitiveType: Parser[DataType] =
-    "string" ^^^ StringType |
-    "float" ^^^ FloatType |
-    "int" ^^^ IntegerType |
-    "tinyint" ^^^ ShortType |
-    "short" ^^^ ShortType |
-    "double" ^^^ DoubleType |
-    "long" ^^^ LongType |
-    "binary" ^^^ BinaryType |
-    "boolean" ^^^ BooleanType |
-    fixedDecimalType |
-    "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
-    "varchar\\((\\d+)\\)".r ^^^ StringType |
-    "timestamp" ^^^ TimestampType
-
-  protected lazy val fixedDecimalType: Parser[DataType] =
-    "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
-      case precision ~ scale =>
-        DecimalType(precision.toInt, scale.toInt)
-    }
-
-  protected lazy val arrayType: Parser[DataType] =
-    "array" ~> "<" ~> dataType <~ ">" ^^ {
-      case tpe => ArrayType(tpe)
-    }
-
-  protected lazy val mapType: Parser[DataType] =
-    "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
-      case t1 ~ _ ~ t2 => MapType(t1, t2)
-    }
-
-  protected lazy val structField: Parser[StructField] =
-    "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
-      case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
-    }
-
-  protected lazy val structType: Parser[DataType] =
-    "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
-      case fields => StructType(fields)
-    }
-
-  protected lazy val dataType: Parser[DataType] =
-    arrayType |
-    mapType |
-    structType |
-    primitiveType
-
-  def toDataType(metastoreType: String): DataType = {
-    parseAll(dataType, metastoreType) match {
-      case Success(result, _) => result
-      case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType")
-    }
-  }
-
-  def toMetastoreType(dt: DataType): String = {
-    dt match {
-      case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
-      case StructType(fields) =>
-        s"struct<${
-          fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
-            .mkString(",")
-        }>"
-      case StringType => "string"
-      case FloatType => "float"
-      case IntegerType => "int"
-      case ShortType => "tinyint"
-      case DoubleType => "double"
-      case LongType => "bigint"
-      case BinaryType => "binary"
-      case BooleanType => "boolean"
-      case DecimalType() => "decimal"
-      case TimestampType => "timestamp"
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 1e944f7..44dbd6b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -34,10 +34,11 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
 import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand}
 import org.apache.spark.sql.hive.execution.command._
-import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.types.IntegerType
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
@@ -121,7 +122,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
           scan
         }
 
-      if (projectList.map(_.toAttribute) == scan.attributesRaw &&
+      if (projectList.map(_.toAttribute) == scan.columnProjection &&
           projectSet.size == projectList.size &&
           filterSet.subsetOf(projectSet)) {
         // copied from spark pruneFilterProjectRaw
@@ -151,9 +152,9 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
         predicates,
         useUnsafeCoversion = false)(sqlContext)
       projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode)
-      val updatedAttrs = scan.attributesRaw.map(attr =>
+      val updatedAttrs = scan.columnProjection.map(attr =>
         updateDataType(attr.asInstanceOf[AttributeReference], relation, projectExprsNeedToDecode))
-      scan.attributesRaw = updatedAttrs
+      scan.columnProjection = updatedAttrs
       if (projectExprsNeedToDecode.size() > 0
           && isDictionaryEncoded(projectExprsNeedToDecode.asScala.toSeq, relation)) {
         val decoder = getCarbonDecoder(logicalRelation,
@@ -235,15 +236,15 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
   object DDLStrategies extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case DropTable(tableName, ifNotExists)
-        if (CarbonEnv.getInstance(sqlContext).carbonCatalog
-            .isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext)) =>
+        if CarbonEnv.get.carbonMetastore
+            .isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) =>
         val identifier = toTableIdentifier(tableName.toLowerCase)
         ExecutedCommand(DropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil
       case ShowLoadsCommand(databaseName, table, limit) =>
         ExecutedCommand(ShowLoads(databaseName, table, limit, plan.output)) :: Nil
       case LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
       options, isOverwriteExist, inputSqlString, dataFrame) =>
-        val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
+        val isCarbonTable = CarbonEnv.get.carbonMetastore
             .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
         if (isCarbonTable || options.nonEmpty) {
           ExecutedCommand(LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
@@ -252,7 +253,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
           ExecutedCommand(HiveNativeCommand(inputSqlString)) :: Nil
         }
       case alterTable@AlterTableCompaction(altertablemodel) =>
-        val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
+        val isCarbonTable = CarbonEnv.get.carbonMetastore
             .tableExists(TableIdentifier(altertablemodel.tableName,
                  altertablemodel.dbName))(sqlContext)
         if (isCarbonTable) {
@@ -286,7 +287,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
           case e: Exception => ExecutedCommand(d) :: Nil
         }
       case DescribeFormattedCommand(sql, tblIdentifier) =>
-        val isTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
+        val isTable = CarbonEnv.get.carbonMetastore
             .tableExists(tblIdentifier)(sqlContext)
         if (isTable) {
           val describe =