You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:41 UTC

[03/14] incubator-carbondata git commit: rebase

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
deleted file mode 100644
index 9a4e209..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ /dev/null
@@ -1,875 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import java.io.{FileNotFoundException, IOException}
-import java.nio.charset.Charset
-import java.util.regex.Pattern
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.language.implicitConversions
-import scala.util.control.Breaks.{break, breakable}
-
-import org.apache.commons.lang3.{ArrayUtils, StringUtils}
-import org.apache.spark.Accumulator
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.sql.hive.CarbonMetastoreCatalog
-import org.apache.spark.util.FileUtils
-
-import org.apache.carbondata.common.factory.CarbonCommonFactory
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
-import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.carbon.path.CarbonStorePath
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.reader.CarbonDictionaryReader
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.writer.CarbonDictionaryWriter
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.spark.CarbonSparkFactory
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.rdd._
-
-/**
- * A object which provide a method to generate global dictionary from CSV files.
- */
-object GlobalDictionaryUtil {
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  /**
-   * The default separator to use if none is supplied to the constructor.
-   */
-  val DEFAULT_SEPARATOR: Char = ','
-  /**
-   * The default quote character to use if none is supplied to the
-   * constructor.
-   */
-  val DEFAULT_QUOTE_CHARACTER: Char = '"'
-
-  /**
-   * find columns which need to generate global dictionary.
-   *
-   * @param dimensions dimension list of schema
-   * @param headers    column headers
-   * @param columns    column list of csv file
-   */
-  def pruneDimensions(dimensions: Array[CarbonDimension],
-      headers: Array[String],
-      columns: Array[String]): (Array[CarbonDimension], Array[String]) = {
-    val dimensionBuffer = new ArrayBuffer[CarbonDimension]
-    val columnNameBuffer = new ArrayBuffer[String]
-    val dimensionsWithDict = dimensions.filter(hasEncoding(_, Encoding.DICTIONARY,
-      Encoding.DIRECT_DICTIONARY))
-    dimensionsWithDict.foreach { dim =>
-      breakable {
-        headers.zipWithIndex.foreach { h =>
-          if (dim.getColName.equalsIgnoreCase(h._1)) {
-            dimensionBuffer += dim
-            columnNameBuffer += columns(h._2)
-            break
-          }
-        }
-      }
-    }
-    (dimensionBuffer.toArray, columnNameBuffer.toArray)
-  }
-
-  /**
-   * use this method to judge whether CarbonDimension use some encoding or not
-   *
-   * @param dimension       carbonDimension
-   * @param encoding        the coding way of dimension
-   * @param excludeEncoding the coding way to exclude
-   */
-  def hasEncoding(dimension: CarbonDimension,
-      encoding: Encoding,
-      excludeEncoding: Encoding): Boolean = {
-    if (dimension.isComplex()) {
-      val children = dimension.getListOfChildDimensions
-      children.asScala.exists(hasEncoding(_, encoding, excludeEncoding))
-    } else {
-      dimension.hasEncoding(encoding) &&
-      (excludeEncoding == null || !dimension.hasEncoding(excludeEncoding))
-    }
-  }
-
-  def gatherDimensionByEncoding(carbonLoadModel: CarbonLoadModel,
-      dimension: CarbonDimension,
-      encoding: Encoding,
-      excludeEncoding: Encoding,
-      dimensionsWithEncoding: ArrayBuffer[CarbonDimension],
-      forPreDefDict: Boolean) {
-    if (dimension.isComplex) {
-      val children = dimension.getListOfChildDimensions.asScala
-      children.foreach { c =>
-        gatherDimensionByEncoding(carbonLoadModel, c, encoding, excludeEncoding,
-          dimensionsWithEncoding, forPreDefDict)
-      }
-    } else {
-      if (dimension.hasEncoding(encoding) &&
-          (excludeEncoding == null || !dimension.hasEncoding(excludeEncoding))) {
-        if ((forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) != null) ||
-            (!forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) == null)) {
-          dimensionsWithEncoding += dimension
-        }
-      }
-    }
-  }
-
-  def getPrimDimensionWithDict(carbonLoadModel: CarbonLoadModel,
-      dimension: CarbonDimension,
-      forPreDefDict: Boolean): Array[CarbonDimension] = {
-    val dimensionsWithDict = new ArrayBuffer[CarbonDimension]
-    gatherDimensionByEncoding(carbonLoadModel, dimension, Encoding.DICTIONARY,
-      Encoding.DIRECT_DICTIONARY,
-      dimensionsWithDict, forPreDefDict)
-    dimensionsWithDict.toArray
-  }
-
-  /**
-   * invoke CarbonDictionaryWriter to write dictionary to file.
-   *
-   * @param model       instance of DictionaryLoadModel
-   * @param columnIndex the index of current column in column list
-   * @param iter        distinct value list of dictionary
-   */
-  def writeGlobalDictionaryToFile(model: DictionaryLoadModel,
-      columnIndex: Int,
-      iter: Iterator[String]): Unit = {
-    val dictService = CarbonCommonFactory.getDictionaryService
-    val writer: CarbonDictionaryWriter = dictService.getDictionaryWriter(
-      model.table,
-      model.columnIdentifier(columnIndex),
-      model.hdfsLocation
-    )
-    try {
-      while (iter.hasNext) {
-        writer.write(iter.next)
-      }
-    } finally {
-      writer.close()
-    }
-  }
-
-  /**
-   * read global dictionary from cache
-   */
-  def readGlobalDictionaryFromCache(model: DictionaryLoadModel): HashMap[String, Dictionary] = {
-    val dictMap = new HashMap[String, Dictionary]
-    model.primDimensions.zipWithIndex.filter(f => model.dictFileExists(f._2)).foreach { m =>
-      val dict = CarbonLoaderUtil.getDictionary(model.table,
-        m._1.getColumnIdentifier, model.hdfsLocation,
-        m._1.getDataType
-      )
-      dictMap.put(m._1.getColumnId, dict)
-    }
-    dictMap
-  }
-
-  /**
-   * invoke CarbonDictionaryReader to read dictionary from files.
-   *
-   * @param model carbon dictionary load model
-   */
-  def readGlobalDictionaryFromFile(model: DictionaryLoadModel): HashMap[String, HashSet[String]] = {
-    val dictMap = new HashMap[String, HashSet[String]]
-    val dictService = CarbonCommonFactory.getDictionaryService
-    for (i <- model.primDimensions.indices) {
-      val set = new HashSet[String]
-      if (model.dictFileExists(i)) {
-        val reader: CarbonDictionaryReader = dictService.getDictionaryReader(model.table,
-          model.columnIdentifier(i), model.hdfsLocation
-        )
-        val values = reader.read
-        if (values != null) {
-          for (j <- 0 until values.size) {
-            set.add(new String(values.get(j),
-              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)))
-          }
-        }
-      }
-      dictMap.put(model.primDimensions(i).getColumnId, set)
-    }
-    dictMap
-  }
-
-  def generateParserForChildrenDimension(dim: CarbonDimension,
-      format: DataFormat,
-      mapColumnValuesWithId:
-      HashMap[String, HashSet[String]],
-      generic: GenericParser): Unit = {
-    val children = dim.getListOfChildDimensions.asScala
-    for (i <- children.indices) {
-      generateParserForDimension(Some(children(i)), format.cloneAndIncreaseIndex,
-        mapColumnValuesWithId) match {
-        case Some(childDim) =>
-          generic.addChild(childDim)
-        case None =>
-      }
-    }
-  }
-
-  def generateParserForDimension(dimension: Option[CarbonDimension],
-      format: DataFormat,
-      mapColumnValuesWithId: HashMap[String, HashSet[String]]): Option[GenericParser] = {
-    dimension match {
-      case None =>
-        None
-      case Some(dim) =>
-        dim.getDataType match {
-          case DataType.ARRAY =>
-            val arrDim = ArrayParser(dim, format)
-            generateParserForChildrenDimension(dim, format, mapColumnValuesWithId, arrDim)
-            Some(arrDim)
-          case DataType.STRUCT =>
-            val stuDim = StructParser(dim, format)
-            generateParserForChildrenDimension(dim, format, mapColumnValuesWithId, stuDim)
-            Some(stuDim)
-          case _ =>
-            Some(PrimitiveParser(dim, mapColumnValuesWithId.get(dim.getColumnId)))
-        }
-    }
-  }
-
-  def createDataFormat(delimiters: Array[String]): DataFormat = {
-    if (ArrayUtils.isNotEmpty(delimiters)) {
-      val patterns = delimiters.map { d =>
-        Pattern.compile(if (d == null) {
-          ""
-        } else {
-          d
-        })
-      }
-      DataFormat(delimiters, 0, patterns)
-    } else {
-      null
-    }
-  }
-
-  def isHighCardinalityColumn(columnCardinality: Int,
-      rowCount: Long,
-      model: DictionaryLoadModel): Boolean = {
-    (columnCardinality > model.highCardThreshold) &&
-    (rowCount > 0) &&
-    (columnCardinality.toDouble / rowCount * 100 > model.rowCountPercentage)
-  }
-
-  /**
-   * create a instance of DictionaryLoadModel
-   *
-   * @param carbonLoadModel carbon load model
-   * @param table           CarbonTableIdentifier
-   * @param dimensions      column list
-   * @param hdfsLocation    store location in HDFS
-   * @param dictfolderPath  path of dictionary folder
-   */
-  def createDictionaryLoadModel(carbonLoadModel: CarbonLoadModel,
-      table: CarbonTableIdentifier,
-      dimensions: Array[CarbonDimension],
-      hdfsLocation: String,
-      dictfolderPath: String,
-      forPreDefDict: Boolean): DictionaryLoadModel = {
-    val primDimensionsBuffer = new ArrayBuffer[CarbonDimension]
-    val isComplexes = new ArrayBuffer[Boolean]
-    for (i <- dimensions.indices) {
-      val dims = getPrimDimensionWithDict(carbonLoadModel, dimensions(i), forPreDefDict)
-      for (j <- dims.indices) {
-        primDimensionsBuffer += dims(j)
-        isComplexes += dimensions(i).isComplex
-      }
-    }
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(hdfsLocation, table)
-    val primDimensions = primDimensionsBuffer.map { x => x }.toArray
-    val dictDetail = CarbonSparkFactory.getDictionaryDetailService().
-      getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
-    val dictFilePaths = dictDetail.dictFilePaths
-    val dictFileExists = dictDetail.dictFileExists
-    val columnIdentifier = dictDetail.columnIdentifiers
-    val hdfsTempLocation = CarbonProperties.getInstance.
-      getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, System.getProperty("java.io.tmpdir"))
-    val lockType = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
-    val zookeeperUrl = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.ZOOKEEPER_URL)
-    // load high cardinality identify configure
-    val highCardIdentifyEnable = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
-      CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT).toBoolean
-    val highCardThreshold = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD,
-      CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT).toInt
-    val rowCountPercentage = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
-      CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT).toDouble
-
-    val serializationNullFormat =
-      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
-    // get load count
-    if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CarbonDataRDDFactory.readLoadMetadataDetails(carbonLoadModel, hdfsLocation)
-    }
-    new DictionaryLoadModel(table,
-      dimensions,
-      hdfsLocation,
-      dictfolderPath,
-      dictFilePaths,
-      dictFileExists,
-      isComplexes.toArray,
-      primDimensions,
-      carbonLoadModel.getDelimiters,
-      highCardIdentifyEnable,
-      highCardThreshold,
-      rowCountPercentage,
-      columnIdentifier,
-      carbonLoadModel.getLoadMetadataDetails.size() == 0,
-      hdfsTempLocation,
-      lockType,
-      zookeeperUrl,
-      serializationNullFormat)
-  }
-
-  /**
-   * load CSV files to DataFrame by using datasource "com.databricks.spark.csv"
-   *
-   * @param sqlContext      SQLContext
-   * @param carbonLoadModel carbon data load model
-   */
-  def loadDataFrame(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel): DataFrame = {
-    val df = sqlContext.read
-      .format("com.databricks.spark.csv.newapi")
-      .option("header", {
-        if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
-          "true"
-        } else {
-          "false"
-        }
-      })
-      .option("delimiter", {
-        if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
-          "" + DEFAULT_SEPARATOR
-        } else {
-          carbonLoadModel.getCsvDelimiter
-        }
-      })
-      .option("parserLib", "univocity")
-      .option("escape", carbonLoadModel.getEscapeChar)
-      .option("ignoreLeadingWhiteSpace", "false")
-      .option("ignoreTrailingWhiteSpace", "false")
-      .option("codec", "gzip")
-      .option("quote", {
-        if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) {
-          "" + DEFAULT_QUOTE_CHARACTER
-        } else {
-          carbonLoadModel.getQuoteChar
-        }
-      })
-      .option("comment", carbonLoadModel.getCommentChar)
-      .load(carbonLoadModel.getFactFilePath)
-    df
-  }
-
-  private def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
-      sqlContext: SQLContext,
-      model: DictionaryLoadModel,
-      noDictDimension: Array[CarbonDimension]): Unit = {
-
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
-      model.table)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
-
-    // read TableInfo
-    val tableInfo = CarbonMetastoreCatalog.readSchemaFileToThriftTable(schemaFilePath)
-
-    // modify TableInfo
-    val columns = tableInfo.getFact_table.getTable_columns
-    for (i <- 0 until columns.size) {
-      if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
-        columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
-      }
-    }
-
-    // write TableInfo
-    CarbonMetastoreCatalog.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
-
-    // update Metadata
-    val catalog = CarbonEnv.getInstance(sqlContext).carbonCatalog
-    catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
-      model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
-
-    // update CarbonDataLoadSchema
-    val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
-      model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
-    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
-
-  }
-
-  /**
-   * check whether global dictionary have been generated successfully or not
-   *
-   * @param status checking whether the generating is  successful
-   */
-  private def checkStatus(carbonLoadModel: CarbonLoadModel,
-      sqlContext: SQLContext,
-      model: DictionaryLoadModel,
-      status: Array[(Int, String, Boolean)]) = {
-    var result = false
-    val noDictionaryColumns = new ArrayBuffer[CarbonDimension]
-    val tableName = model.table.getTableName
-    status.foreach { x =>
-      val columnName = model.primDimensions(x._1).getColName
-      if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(x._2)) {
-        result = true
-        LOGGER.error(s"table:$tableName column:$columnName generate global dictionary file failed")
-      }
-      if (x._3) {
-        noDictionaryColumns += model.primDimensions(x._1)
-      }
-    }
-    if (noDictionaryColumns.nonEmpty) {
-      updateTableMetadata(carbonLoadModel, sqlContext, model, noDictionaryColumns.toArray)
-    }
-    if (result) {
-      LOGGER.error("generate global dictionary files failed")
-      throw new Exception("Failed to generate global dictionary files")
-    } else {
-      LOGGER.info("generate global dictionary successfully")
-    }
-  }
-
-  /**
-   * get external columns and whose dictionary file path
-   *
-   * @param colDictFilePath external column dict file path
-   * @param table           table identifier
-   * @param dimensions      dimension columns
-   */
-  private def setPredefinedColumnDictPath(carbonLoadModel: CarbonLoadModel,
-      colDictFilePath: String,
-      table: CarbonTableIdentifier,
-      dimensions: Array[CarbonDimension]) = {
-    val colFileMapArray = colDictFilePath.split(",")
-    for (colPathMap <- colFileMapArray) {
-      val colPathMapTrim = colPathMap.trim
-      val colNameWithPath = colPathMapTrim.split(":")
-      if (colNameWithPath.length == 1) {
-        LOGGER.error("the format of external column dictionary should be " +
-                     "columnName:columnPath, please check")
-        throw new DataLoadingException("the format of predefined column dictionary" +
-                                       " should be columnName:columnPath, please check")
-      }
-      setPredefineDict(carbonLoadModel, dimensions, table, colNameWithPath(0),
-        FileUtils.getPaths(colPathMapTrim.substring(colNameWithPath(0).length + 1)))
-    }
-  }
-
-  /**
-   * set pre defined dictionary for dimension
-   *
-   * @param dimensions    all the dimensions
-   * @param table         carbon table identifier
-   * @param colName       user specified  column name for predefined dict
-   * @param colDictPath   column dictionary file path
-   * @param parentDimName parent dimenion for complex type
-   */
-  def setPredefineDict(carbonLoadModel: CarbonLoadModel,
-      dimensions: Array[CarbonDimension],
-      table: CarbonTableIdentifier,
-      colName: String,
-      colDictPath: String,
-      parentDimName: String = "") {
-    val middleDimName = colName.split("\\.")(0)
-    val dimParent = parentDimName + {
-      colName match {
-        case "" => colName
-        case _ =>
-          if (parentDimName.isEmpty) middleDimName else "." + middleDimName
-      }
-    }
-    // judge whether the column is exists
-    val preDictDimensionOption = dimensions.filter(
-      _.getColName.equalsIgnoreCase(dimParent))
-    if (preDictDimensionOption.length == 0) {
-      LOGGER.error(s"Column $dimParent is not a key column " +
-                   s"in ${ table.getDatabaseName }.${ table.getTableName }")
-      throw new DataLoadingException(s"Column $dimParent is not a key column. " +
-                                     s"Only key column can be part of dictionary " +
-                                     s"and used in COLUMNDICT option.")
-    }
-    val preDictDimension = preDictDimensionOption(0)
-    if (preDictDimension.isComplex) {
-      val children = preDictDimension.getListOfChildDimensions.asScala.toArray
-      // for Array, user set ArrayFiled: path, while ArrayField has a child Array.val
-      val currentColName = {
-        preDictDimension.getDataType match {
-          case DataType.ARRAY =>
-            if (children(0).isComplex) {
-              "val." + colName.substring(middleDimName.length + 1)
-            } else {
-              "val"
-            }
-          case _ => colName.substring(middleDimName.length + 1)
-        }
-      }
-      setPredefineDict(carbonLoadModel, children, table, currentColName,
-        colDictPath, dimParent)
-    } else {
-      carbonLoadModel.setPredefDictMap(preDictDimension, colDictPath)
-    }
-  }
-
-  /**
-   * use external dimension column to generate global dictionary
-   *
-   * @param colDictFilePath external column dict file path
-   * @param table           table identifier
-   * @param dimensions      dimension column
-   * @param carbonLoadModel carbon load model
-   * @param sqlContext      spark sql context
-   * @param hdfsLocation    store location on hdfs
-   * @param dictFolderPath  generated global dict file path
-   */
-  private def generatePredefinedColDictionary(colDictFilePath: String,
-      table: CarbonTableIdentifier,
-      dimensions: Array[CarbonDimension],
-      carbonLoadModel: CarbonLoadModel,
-      sqlContext: SQLContext,
-      hdfsLocation: String,
-      dictFolderPath: String) = {
-    // set pre defined dictionary column
-    setPredefinedColumnDictPath(carbonLoadModel, colDictFilePath, table, dimensions)
-    val dictLoadModel = createDictionaryLoadModel(carbonLoadModel, table, dimensions,
-      hdfsLocation, dictFolderPath, forPreDefDict = true)
-    // new RDD to achieve distributed column dict generation
-    val extInputRDD = new CarbonColumnDictGenerateRDD(carbonLoadModel, dictLoadModel,
-      sqlContext.sparkContext, table, dimensions, hdfsLocation, dictFolderPath)
-      .partitionBy(new ColumnPartitioner(dictLoadModel.primDimensions.length))
-    val statusList = new CarbonGlobalDictionaryGenerateRDD(extInputRDD, dictLoadModel).collect()
-    // check result status
-    checkStatus(carbonLoadModel, sqlContext, dictLoadModel, statusList)
-  }
-
-  /* generate Dimension Parsers
-   *
-   * @param model
-   * @param distinctValuesList
-   * @return dimensionParsers
-   */
-  def createDimensionParsers(model: DictionaryLoadModel,
-      distinctValuesList: ArrayBuffer[(Int, HashSet[String])]): Array[GenericParser] = {
-    // local combine set
-    val dimNum = model.dimensions.length
-    val primDimNum = model.primDimensions.length
-    val columnValues = new Array[HashSet[String]](primDimNum)
-    val mapColumnValuesWithId = new HashMap[String, HashSet[String]]
-    for (i <- 0 until primDimNum) {
-      columnValues(i) = new HashSet[String]
-      distinctValuesList += ((i, columnValues(i)))
-      mapColumnValuesWithId.put(model.primDimensions(i).getColumnId, columnValues(i))
-    }
-    val dimensionParsers = new Array[GenericParser](dimNum)
-    for (j <- 0 until dimNum) {
-      dimensionParsers(j) = GlobalDictionaryUtil.generateParserForDimension(
-        Some(model.dimensions(j)),
-        GlobalDictionaryUtil.createDataFormat(model.delimiters),
-        mapColumnValuesWithId).get
-    }
-    dimensionParsers
-  }
-
-  /**
-   * parse records in dictionary file and validate record
-   *
-   * @param x
-   * @param accum
-   * @param csvFileColumns
-   */
-  private def parseRecord(x: String, accum: Accumulator[Int],
-      csvFileColumns: Array[String]): (String, String) = {
-    val tokens = x.split("" + DEFAULT_SEPARATOR)
-    var columnName: String = ""
-    var value: String = ""
-    // such as "," , "", throw ex
-    if (tokens.isEmpty) {
-      LOGGER.error("Read a bad dictionary record: " + x)
-      accum += 1
-    } else if (tokens.size == 1) {
-      // such as "1", "jone", throw ex
-      if (!x.contains(",")) {
-        accum += 1
-      } else {
-        try {
-          columnName = csvFileColumns(tokens(0).toInt)
-        } catch {
-          case ex: Exception =>
-            LOGGER.error("Read a bad dictionary record: " + x)
-            accum += 1
-        }
-      }
-    } else {
-      try {
-        columnName = csvFileColumns(tokens(0).toInt)
-        value = tokens(1)
-      } catch {
-        case ex: Exception =>
-          LOGGER.error("Read a bad dictionary record: " + x)
-          accum += 1
-      }
-    }
-    (columnName, value)
-  }
-
-  /**
-   * read local dictionary and prune column
-   *
-   * @param sqlContext
-   * @param csvFileColumns
-   * @param requireColumns
-   * @param allDictionaryPath
-   * @return allDictionaryRdd
-   */
-  private def readAllDictionaryFiles(sqlContext: SQLContext,
-      csvFileColumns: Array[String],
-      requireColumns: Array[String],
-      allDictionaryPath: String,
-      accumulator: Accumulator[Int]) = {
-    var allDictionaryRdd: RDD[(String, Iterable[String])] = null
-    try {
-      // read local dictionary file, and spilt (columnIndex, columnValue)
-      val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath)
-        .map(x => parseRecord(x, accumulator, csvFileColumns)).persist()
-
-      // group by column index, and filter required columns
-      val requireColumnsList = requireColumns.toList
-      allDictionaryRdd = basicRdd
-        .groupByKey()
-        .filter(x => requireColumnsList.contains(x._1))
-    } catch {
-      case ex: Exception =>
-        LOGGER.error("Read dictionary files failed. Caused by: " + ex.getMessage)
-        throw ex
-    }
-    allDictionaryRdd
-  }
-
-  /**
-   * validate local dictionary files
-   *
-   * @param allDictionaryPath
-   * @return (isNonempty, isDirectory)
-   */
-  private def validateAllDictionaryPath(allDictionaryPath: String): Boolean = {
-    val fileType = FileFactory.getFileType(allDictionaryPath)
-    val filePath = FileFactory.getCarbonFile(allDictionaryPath, fileType)
-    // filepath regex, look like "/path/*.dictionary"
-    if (filePath.getName.startsWith("*")) {
-      val dictExt = filePath.getName.substring(1)
-      if (filePath.getParentFile.exists()) {
-        val listFiles = filePath.getParentFile.listFiles()
-        if (listFiles.exists(file =>
-          file.getName.endsWith(dictExt) && file.getSize > 0)) {
-          true
-        } else {
-          LOGGER.warn("No dictionary files found or empty dictionary files! " +
-                      "Won't generate new dictionary.")
-          false
-        }
-      } else {
-        throw new FileNotFoundException(
-          "The given dictionary file path is not found!")
-      }
-    } else {
-      if (filePath.exists()) {
-        if (filePath.getSize > 0) {
-          true
-        } else {
-          LOGGER.warn("No dictionary files found or empty dictionary files! " +
-                      "Won't generate new dictionary.")
-          false
-        }
-      } else {
-        throw new FileNotFoundException(
-          "The given dictionary file path is not found!")
-      }
-    }
-  }
-
-  /**
-   * get file headers from fact file
-   *
-   * @param carbonLoadModel
-   * @return headers
-   */
-  private def getHeaderFormFactFile(carbonLoadModel: CarbonLoadModel): Array[String] = {
-    var headers: Array[String] = null
-    val factFile: String = carbonLoadModel.getFactFilePath.split(",")(0)
-    val readLine = CarbonUtil.readHeader(factFile)
-
-    if (null != readLine) {
-      val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
-        "" + DEFAULT_SEPARATOR
-      } else {
-        carbonLoadModel.getCsvDelimiter
-      }
-      headers = readLine.toLowerCase().split(delimiter)
-    } else {
-      LOGGER.error("Not found file header! Please set fileheader")
-      throw new IOException("Failed to get file header")
-    }
-    headers
-  }
-
-  /**
-   * generate global dictionary with SQLContext and CarbonLoadModel
-   *
-   * @param sqlContext      sql context
-   * @param carbonLoadModel carbon load model
-   */
-  def generateGlobalDictionary(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      storePath: String,
-      dataFrame: Option[DataFrame] = None): Unit = {
-    try {
-      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-      val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
-      // create dictionary folder if not exists
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
-      val dictfolderPath = carbonTablePath.getMetadataDirectoryPath
-      // columns which need to generate global dictionary file
-      val dimensions = carbonTable.getDimensionByTableName(
-        carbonTable.getFactTableName).asScala.toArray
-      // generate global dict from pre defined column dict file
-      carbonLoadModel.initPredefDictMap()
-
-      val allDictionaryPath = carbonLoadModel.getAllDictPath
-      if (StringUtils.isEmpty(allDictionaryPath)) {
-        LOGGER.info("Generate global dictionary from source data files!")
-        // load data by using dataSource com.databricks.spark.csv
-        var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel))
-        var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
-          df.columns
-        } else {
-          carbonLoadModel.getCsvHeader.split("" + DEFAULT_SEPARATOR)
-        }
-        headers = headers.map(headerName => headerName.trim)
-        val colDictFilePath = carbonLoadModel.getColDictFilePath
-        if (colDictFilePath != null) {
-          // generate predefined dictionary
-          generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
-            dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath)
-        }
-        if (headers.length > df.columns.length) {
-          val msg = "The number of columns in the file header do not match the " +
-                    "number of columns in the data file; Either delimiter " +
-                    "or fileheader provided is not correct"
-          LOGGER.error(msg)
-          throw new DataLoadingException(msg)
-        }
-        // use fact file to generate global dict
-        val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,
-          headers, df.columns)
-        if (requireDimension.nonEmpty) {
-          // select column to push down pruning
-          df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
-          val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
-            requireDimension, storePath, dictfolderPath, false)
-          // combine distinct value in a block and partition by column
-          val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
-            .partitionBy(new ColumnPartitioner(model.primDimensions.length))
-          // generate global dictionary files
-          val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
-          // check result status
-          checkStatus(carbonLoadModel, sqlContext, model, statusList)
-        } else {
-          LOGGER.info("No column found for generating global dictionary in source data files")
-        }
-        // generate global dict from dimension file
-        if (carbonLoadModel.getDimFolderPath != null) {
-          val fileMapArray = carbonLoadModel.getDimFolderPath.split(",")
-          for (fileMap <- fileMapArray) {
-            val dimTableName = fileMap.split(":")(0)
-            var dimDataframe = loadDataFrame(sqlContext, carbonLoadModel)
-            val (requireDimensionForDim, requireColumnNamesForDim) =
-              pruneDimensions(dimensions, dimDataframe.columns, dimDataframe.columns)
-            if (requireDimensionForDim.length >= 1) {
-              dimDataframe = dimDataframe.select(requireColumnNamesForDim.head,
-                requireColumnNamesForDim.tail: _*)
-              val modelforDim = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
-                requireDimensionForDim, storePath, dictfolderPath, false)
-              val inputRDDforDim = new CarbonBlockDistinctValuesCombineRDD(
-                dimDataframe.rdd, modelforDim)
-                .partitionBy(new ColumnPartitioner(modelforDim.primDimensions.length))
-              val statusListforDim = new CarbonGlobalDictionaryGenerateRDD(
-                inputRDDforDim, modelforDim).collect()
-              checkStatus(carbonLoadModel, sqlContext, modelforDim, statusListforDim)
-            } else {
-              LOGGER.info(s"No columns in dimension table $dimTableName " +
-                          "to generate global dictionary")
-            }
-          }
-        }
-      } else {
-        LOGGER.info("Generate global dictionary from dictionary files!")
-        val isNonempty = validateAllDictionaryPath(allDictionaryPath)
-        if (isNonempty) {
-          var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
-            getHeaderFormFactFile(carbonLoadModel)
-          } else {
-            carbonLoadModel.getCsvHeader.toLowerCase.split("" + DEFAULT_SEPARATOR)
-          }
-          headers = headers.map(headerName => headerName.trim)
-          // prune columns according to the CSV file header, dimension columns
-          val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, headers, headers)
-          if (requireDimension.nonEmpty) {
-            val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
-              requireDimension, storePath, dictfolderPath, false)
-            // check if dictionary files contains bad record
-            val accumulator = sqlContext.sparkContext.accumulator(0)
-            // read local dictionary file, and group by key
-            val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers,
-              requireColumnNames, allDictionaryPath, accumulator)
-            // read exist dictionary and combine
-            val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model)
-              .partitionBy(new ColumnPartitioner(model.primDimensions.length))
-            // generate global dictionary files
-            val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
-            // check result status
-            checkStatus(carbonLoadModel, sqlContext, model, statusList)
-            // if the dictionary contains wrong format record, throw ex
-            if (accumulator.value > 0) {
-              throw new DataLoadingException("Data Loading failure, dictionary values are " +
-                                             "not in correct format!")
-            }
-          } else {
-            LOGGER.info("have no column need to generate global dictionary")
-          }
-        }
-      }
-    } catch {
-      case ex: Exception =>
-        LOGGER.error(ex, "generate global dictionary failed")
-        throw ex
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
deleted file mode 100644
index 3acde94..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark._
-
-case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
-
-class DataLoadCoalescedRDD[T: ClassTag](
-  @transient var prev: RDD[T],
-  nodeList: Array[String])
-    extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) with Logging {
-
-  override def getPartitions: Array[Partition] = {
-    new DataLoadPartitionCoalescer(prev, nodeList).run
-  }
-
-  override def compute(split: Partition,
-      context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
-
-    new Iterator[DataLoadPartitionWrap[T]] {
-      val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator
-      def hasNext = iter.hasNext
-      def next: DataLoadPartitionWrap[T] = {
-        DataLoadPartitionWrap(firstParent[T], iter.next())
-      }
-    }
-  }
-
-  override def getDependencies: Seq[Dependency[_]] = {
-    Seq(new NarrowDependency(prev) {
-      def getParents(id: Int): Seq[Int] =
-        partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
-    })
-  }
-
-  override def clearDependencies() {
-    super.clearDependencies()
-    prev = null
-  }
-
-  /**
-   * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
-   * then the preferred machine will be one which most parent splits prefer too.
-   * @param partition
-   * @return the machine most preferred by split
-   */
-  override def getPreferredLocations(partition: Partition): Seq[String] = {
-    partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala
deleted file mode 100644
index 8e0971c..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.rdd
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.collection.mutable.LinkedHashSet
-
-import org.apache.spark.Logging
-import org.apache.spark.Partition
-import org.apache.spark.scheduler.TaskLocation
-
-/**
- * DataLoadPartitionCoalescer
- * Repartition the partitions of rdd to few partitions, one partition per node.
- * exmaple:
- * blk_hst  host1 host2 host3 host4 host5
- * block1   host1 host2 host3
- * block2         host2       host4 host5
- * block3               host3 host4 host5
- * block4   host1 host2       host4
- * block5   host1       host3 host4
- * block6   host1 host2             host5
- * -------------------------------------------------------
- * 1. sort host by number of blocks
- * -------------------------------------------------------
- * host3: block1 block3 block5
- * host5: block2 block3 block6
- * host1: block1 block4 block5 block6
- * host2: block1 block2 block4 block6
- * host4: block2 block3 block4 block5
- * -------------------------------------------------------
- * 2. sort blocks of each host1
- * new partitions are before old partitions
- * -------------------------------------------------------
- * host3:                      block1 block3 block5
- * host5:        block2 block6+block3
- * host1: block4+block1 block5 block6
- * host2: block1 block2 block4 block6
- * host4: block2 block3 block4 block5
- * -------------------------------------------------------
- * 3. assign blocks to host
- * -------------------------------------------------------
- * step1: host3 choose block1, remove from host1, host2
- * step2: host5 choose block2, remove from host2, host4
- * step3: host1 choose block4, .....
- * -------------------------------------------------------
- * result:
- * host3:                      block1       block5
- * host5:        block2
- * host1: block4
- * host2:                      block6
- * host4:        block3
- */
-class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: Array[String]) extends Logging {
-
-  val prevPartitions = prev.partitions
-  var numOfParts = Math.max(1, Math.min(nodeList.length, prevPartitions.length))
-  // host => partition id list
-  val hostMapPartitionIds = new HashMap[String, LinkedHashSet[Int]]
-  // partition id => host list
-  val partitionIdMapHosts = new HashMap[Int, ArrayBuffer[String]]
-  val noLocalityPartitions = new ArrayBuffer[Int]
-  var noLocality = true
-  /**
-   * assign a task location for a partition
-   */
-  private def getLocation(index: Int): Option[String] = {
-    if (index < nodeList.length) {
-      Some(nodeList(index))
-    } else {
-      None
-    }
-  }
-
-  /**
-   * collect partitions to each node
-   */
-  private def groupByNode(): Unit = {
-    // initialize hostMapPartitionIds
-    nodeList.foreach { node =>
-      val map = new LinkedHashSet[Int]
-      hostMapPartitionIds.put(node, map)
-    }
-    // collect partitions for each node
-    val tmpNoLocalityPartitions = new ArrayBuffer[Int]
-    prevPartitions.foreach { p =>
-      val locs = DataLoadPartitionCoalescer.getPreferredLocs(prev, p)
-      if (locs.isEmpty) {
-        // if a partition has no location, add to noLocalityPartitions
-        tmpNoLocalityPartitions += p.index
-      } else {
-        // add partion to hostMapPartitionIds and partitionIdMapHosts
-        locs.foreach { loc =>
-          val host = loc.host
-          hostMapPartitionIds.get(host) match {
-            // if the location of the partition is not in node list,
-            // will add this partition to noLocalityPartitions
-            case None => tmpNoLocalityPartitions += p.index
-            case Some(ids) =>
-              noLocality = false
-              ids += p.index
-              partitionIdMapHosts.get(p.index) match {
-                case None =>
-                  val hosts = new ArrayBuffer[String]
-                  hosts += host
-                  partitionIdMapHosts.put(p.index, hosts)
-                case Some(hosts) =>
-                  hosts += host
-              }
-          }
-        }
-      }
-    }
-
-    // remove locality partition
-    tmpNoLocalityPartitions.distinct.foreach {index =>
-      partitionIdMapHosts.get(index) match {
-        case None => noLocalityPartitions += index
-        case Some(_) =>
-      }
-    }
-  }
-
-  /**
-   * sort host and partitions
-   */
-  private def sortHostAndPartitions(hostMapPartitionIdsSeq: Seq[(String, LinkedHashSet[Int])]) = {
-    val oldPartitionIdSet = new HashSet[Int]
-    // sort host by number of partitions
-    hostMapPartitionIdsSeq.sortBy(_._2.size).map { loc =>
-      // order: newPartitionIds + oldPartitionIds
-      val sortedPartitionIdSet = new LinkedHashSet[Int]
-      var newPartitionIds = new ArrayBuffer[Int]
-      var oldPartitionIds = new ArrayBuffer[Int]
-      loc._2.foreach { p =>
-        if (oldPartitionIdSet.contains(p)) {
-          oldPartitionIds += p
-        } else {
-          newPartitionIds += p
-          oldPartitionIdSet.add(p)
-        }
-      }
-      // sort and add new partitions
-      newPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_))
-      // sort and add old partitions
-      oldPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_))
-      // update hostMapPartitionIds
-      hostMapPartitionIds.put(loc._1, sortedPartitionIdSet)
-      (loc._1, sortedPartitionIdSet)
-    }.toArray
-  }
-
-  /**
-   *  assign locality partition to each host
-   */
-  private def assignPartitonNodeLocality(
-      noEmptyHosts: Seq[(String, LinkedHashSet[Int])]): Array[ArrayBuffer[Int]] = {
-    val localityResult = new Array[ArrayBuffer[Int]](noEmptyHosts.length)
-    for (i <- 0 until localityResult.length) {
-      localityResult(i) = new ArrayBuffer[Int]
-    }
-    val noEmptyHostSet = new HashSet[String]
-    noEmptyHosts.foreach {loc => noEmptyHostSet.add(loc._1)}
-
-    var hostIndex = 0
-    while (noEmptyHostSet.nonEmpty) {
-      val hostEntry = noEmptyHosts(hostIndex)
-      if (noEmptyHostSet.contains(hostEntry._1)) {
-        if (hostEntry._2.nonEmpty) {
-          var partitionId = hostEntry._2.iterator.next
-          localityResult(hostIndex) += partitionId
-          // remove from sortedParts
-          partitionIdMapHosts.get(partitionId) match {
-            case Some(locs) =>
-              locs.foreach { loc =>
-                hostMapPartitionIds.get(loc) match {
-                  case Some(parts) =>
-                    parts.remove(partitionId)
-                }
-              }
-          }
-        } else {
-          noEmptyHostSet.remove(hostEntry._1)
-        }
-      }
-
-      hostIndex = hostIndex + 1
-      if (hostIndex == noEmptyHosts.length) {
-        hostIndex = 0
-      }
-    }
-    localityResult
-  }
-
-  /**
-   * assign no locality partitions to each host
-   */
-  private def assignPartitionNoLocality(emptyHosts: mutable.Buffer[String],
-      noEmptyHosts: mutable.Buffer[String],
-      localityResult: mutable.Buffer[ArrayBuffer[Int]]): Array[ArrayBuffer[Int]] = {
-    val noLocalityResult = new Array[ArrayBuffer[Int]](emptyHosts.length)
-    logInfo(s"non empty host: ${noEmptyHosts.length}, empty host: ${emptyHosts.length}")
-    val avgNumber = prevPartitions.length / (noEmptyHosts.length + emptyHosts.length)
-    for (i <- 0 until noLocalityResult.length) {
-      noLocalityResult(i) = new ArrayBuffer[Int]
-    }
-    var noLocalityPartitionIndex = 0
-    if (noLocalityPartitions.nonEmpty) {
-      if (emptyHosts.nonEmpty) {
-        // at first, assign avg number to empty node
-        for (i <- 0 until avgNumber) {
-          noLocalityResult.foreach { partitionIds =>
-            if (noLocalityPartitionIndex < noLocalityPartitions.length) {
-              partitionIds += noLocalityPartitions(noLocalityPartitionIndex)
-              noLocalityPartitionIndex = noLocalityPartitionIndex + 1
-            }
-          }
-        }
-      }
-      // still have no locality partitions
-      // assign to all hosts
-      if (noLocalityPartitionIndex < noLocalityPartitions.length) {
-        var partIndex = 0
-        for (i <- noLocalityPartitionIndex until noLocalityPartitions.length) {
-          if (partIndex < localityResult.length) {
-            localityResult(partIndex) += noLocalityPartitions(i)
-          } else {
-            noLocalityResult(partIndex - localityResult.length) += noLocalityPartitions(i)
-          }
-          partIndex = partIndex + 1
-          if (partIndex == localityResult.length + noLocalityResult.length) {
-            partIndex = 0
-          }
-        }
-      }
-    }
-    noLocalityResult
-  }
-
-  /**
-   * no locality repartition
-   */
-  private def repartitionNoLocality(): Array[Partition] = {
-    // no locality repartition
-    logInfo("no locality partition")
-    val prevPartIndexs = new Array[ArrayBuffer[Int]](numOfParts)
-    for (i <- 0 until numOfParts) {
-      prevPartIndexs(i) = new ArrayBuffer[Int]
-    }
-    for (i <- 0 until prevPartitions.length) {
-      prevPartIndexs(i % numOfParts) += prevPartitions(i).index
-    }
-    prevPartIndexs.filter(_.nonEmpty).zipWithIndex.map { x =>
-      new CoalescedRDDPartition(x._2, prev, x._1.toArray, getLocation(x._2))
-    }
-  }
-
-  private def repartitionLocality(): Array[Partition] = {
-    logInfo("locality partition")
-    val hostMapPartitionIdsSeq = hostMapPartitionIds.toSeq
-    // empty host seq
-    val emptyHosts = hostMapPartitionIdsSeq.filter(_._2.isEmpty).map(_._1).toBuffer
-    // non empty host array
-    var tempNoEmptyHosts = hostMapPartitionIdsSeq.filter(_._2.nonEmpty)
-
-    // 1. do locality repartition
-    // sort host and partitions
-    tempNoEmptyHosts = sortHostAndPartitions(tempNoEmptyHosts)
-    // assign locality partition to non empty hosts
-    val templocalityResult = assignPartitonNodeLocality(tempNoEmptyHosts)
-    // collect non empty hosts and empty hosts
-    val noEmptyHosts = mutable.Buffer[String]()
-    val localityResult = mutable.Buffer[ArrayBuffer[Int]]()
-    for(index <- 0 until templocalityResult.size) {
-      if (templocalityResult(index).isEmpty) {
-        emptyHosts += tempNoEmptyHosts(index)._1
-      } else {
-        noEmptyHosts += tempNoEmptyHosts(index)._1
-        localityResult += templocalityResult(index)
-      }
-    }
-    // 2. do no locality repartition
-    // assign no locality partitions to all hosts
-    val noLocalityResult = assignPartitionNoLocality(emptyHosts, noEmptyHosts, localityResult)
-
-    // 3. generate CoalescedRDDPartition
-    (0 until localityResult.length + noLocalityResult.length).map { index =>
-      val ids = if (index < localityResult.length) {
-        localityResult(index).toArray
-      } else {
-        noLocalityResult(index - localityResult.length).toArray
-      }
-      val loc = if (index < localityResult.length) {
-        Some(noEmptyHosts(index))
-      } else {
-        Some(emptyHosts(index - localityResult.length))
-      }
-      logInfo(s"CoalescedRDDPartition ${index}, ${ids.length}, ${loc} ")
-      new CoalescedRDDPartition(index, prev, ids, loc)
-    }.filter(_.parentsIndices.nonEmpty).toArray
-
-  }
-
-  def run(): Array[Partition] = {
-    // 1. group partitions by node
-    groupByNode()
-    logInfo(s"partition: ${prevPartitions.length}, no locality: ${noLocalityPartitions.length}")
-    val partitions = if (noLocality) {
-      // 2.A no locality partition
-      repartitionNoLocality()
-    } else {
-      // 2.B locality partition
-      repartitionLocality()
-    }
-    DataLoadPartitionCoalescer.checkPartition(prevPartitions, partitions)
-    partitions
-  }
-}
-
-object DataLoadPartitionCoalescer {
-  def getPreferredLocs(prev: RDD[_], p: Partition): Seq[TaskLocation] = {
-    prev.context.getPreferredLocs(prev, p.index)
-  }
-
-  def getParentsIndices(p: Partition): Array[Int] = {
-    p.asInstanceOf[CoalescedRDDPartition].parentsIndices
-  }
-
-  def checkPartition(prevParts: Array[Partition], parts: Array[Partition]): Unit = {
-    val prevPartIds = new ArrayBuffer[Int]
-    parts.foreach{ p =>
-      prevPartIds ++= DataLoadPartitionCoalescer.getParentsIndices(p)
-    }
-    // all partitions must be arranged once.
-    assert(prevPartIds.size == prevParts.size)
-    val prevPartIdsMap = prevPartIds.map{ id =>
-      (id, id)
-    }.toMap
-    prevParts.foreach{ p =>
-      prevPartIdsMap.get(p.index) match {
-        case None => assert(false, "partition " + p.index + " not found")
-        case Some(_) =>
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index cece5e6..d394bfa 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -17,15 +17,15 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
 import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.types._
 
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+
 /**
  * Top command
  */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index ba97083..de43319 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -57,8 +57,10 @@ class CarbonContext(
 
   CarbonContext.addInstance(sc, this)
   CodeGenerateFactory.init(sc.version)
+  CarbonEnv.init(this)
 
   var lastSchemaUpdatedTime = System.currentTimeMillis()
+  val hiveClientInterface = metadataHive
 
   protected[sql] override lazy val conf: SQLConf = new CarbonSQLConf
 
@@ -66,7 +68,7 @@ class CarbonContext(
   override lazy val catalog = {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
-    new CarbonMetastoreCatalog(this, storePath, metadataHive, queryId) with OverrideCatalog
+    new CarbonMetastore(this, storePath, metadataHive, queryId) with OverrideCatalog
   }
 
   @transient

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index ca0ad58..bb1c4ef 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -31,8 +31,6 @@ import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-import org.apache.spark.sql.hive.{DistributionUtil, TableMeta}
 import org.apache.spark.sql.sources.{Filter, HadoopFsRelation, OutputWriterFactory}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
@@ -43,9 +41,8 @@ import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, Carbon
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
 import org.apache.carbondata.scan.expression.logical.AndExpression
 import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
+import org.apache.carbondata.spark.merger.TableMeta
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
-import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
-
 
 private[sql] case class CarbonDatasourceHadoopRelation(
   sqlContext: SQLContext,
@@ -71,15 +68,7 @@ private[sql] case class CarbonDatasourceHadoopRelation(
       carbonTable.getDatabaseName,
       carbonTable.getFactTableName,
       CarbonSparkUtil.createSparkMeta(carbonTable),
-      TableMeta(absIdentifier.getCarbonTableIdentifier,
-        paths.head,
-        carbonTable,
-        Partitioner(options.partitionClass,
-          Array(""),
-          options.partitionCount.toInt,
-          DistributionUtil.getNodeList(sqlContext.sparkContext)
-        )
-      ),
+      new TableMeta(absIdentifier.getCarbonTableIdentifier, paths.head, carbonTable),
       None
     )(sqlContext)
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index a06d5cb..d898c4f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -23,12 +23,11 @@ import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.hadoop.fs.Path
-import org.apache.spark._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.hive.{CarbonMetaData, CarbonMetastoreTypes, TableMeta}
+import org.apache.spark.sql.hive.{CarbonMetaData, CarbonMetastoreTypes}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
 
@@ -37,6 +36,7 @@ import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.spark.{CarbonOption, _}
+import org.apache.carbondata.spark.merger.TableMeta
 
 /**
  * Carbon relation provider compliant to data source api.
@@ -135,8 +135,8 @@ private[sql] case class CarbonDatasourceRelation(
     extends BaseRelation with Serializable {
 
   lazy val carbonRelation: CarbonRelation = {
-    CarbonEnv.getInstance(context)
-        .carbonCatalog.lookupRelation1(tableIdentifier, None)(sqlContext)
+    CarbonEnv.get
+        .carbonMetastore.lookupRelation1(tableIdentifier, None)(sqlContext)
         .asInstanceOf[CarbonRelation]
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index f4fe900..397d479 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors.attachTree
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
-import org.apache.spark.sql.hive.{CarbonMetastoreCatalog, CarbonMetastoreTypes}
-import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
+import org.apache.spark.sql.hive.{CarbonMetastore, CarbonMetastoreTypes}
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
@@ -35,10 +35,10 @@ import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.carbon.querystatistics._
 import org.apache.carbondata.core.util.{CarbonTimeStatisticsFactory, DataTypeUtil}
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 
 /**
- * It decodes the data.
- *
+ * It decodes the dictionary key to value
  */
 case class CarbonDictionaryDecoder(
     relations: Seq[CarbonDecoderRelation],
@@ -48,7 +48,6 @@ case class CarbonDictionaryDecoder(
   (@transient sqlContext: SQLContext)
   extends UnaryNode {
 
-
   override def otherCopyArgs: Seq[AnyRef] = sqlContext :: Nil
 
   override val output: Seq[Attribute] = {
@@ -149,11 +148,9 @@ case class CarbonDictionaryDecoder(
 
   override def canProcessSafeRows: Boolean = true
 
-
-
   override def doExecute(): RDD[InternalRow] = {
     attachTree(this, "execute") {
-      val storePath = sqlContext.catalog.asInstanceOf[CarbonMetastoreCatalog].storePath
+      val storePath = sqlContext.catalog.asInstanceOf[CarbonMetastore].storePath
       val queryId = sqlContext.getConf("queryId", System.nanoTime() + "")
       val absoluteTableIdentifiers = relations.map { relation =>
         val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index be77954..6cfbd5f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,51 +17,31 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.sql.hive.{CarbonMetastoreCatalog, HiveContext}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.spark.sql.hive.CarbonMetastore
 
 /**
  * Carbon Environment for unified context
  */
-case class CarbonEnv(hiveContext: HiveContext, carbonCatalog: CarbonMetastoreCatalog)
+case class CarbonEnv(carbonMetastore: CarbonMetastore)
 
 object CarbonEnv {
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  var carbonEnv: CarbonEnv = _
 
-  def getInstance(sqlContext: SQLContext): CarbonEnv = {
-    if (carbonEnv == null) {
-      carbonEnv =
-        CarbonEnv(sqlContext.asInstanceOf[CarbonContext],
-          sqlContext.asInstanceOf[CarbonContext].catalog)
+  @volatile private var carbonEnv: CarbonEnv = _
+
+  var initialized = false
+
+  def init(sqlContext: SQLContext): Unit = {
+    if (!initialized) {
+      val cc = sqlContext.asInstanceOf[CarbonContext]
+      val catalog = new CarbonMetastore(cc, cc.storePath, cc.hiveClientInterface, "")
+      carbonEnv = CarbonEnv(catalog)
+      initialized = true
     }
-    carbonEnv
   }
 
-  /**
-   *
-   * Requesting the extra executors other than the existing ones.
-   *
-   * @param sc
-   * @param numExecutors
-   * @return
-   */
-  final def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = {
-    sc.schedulerBackend match {
-      case b: CoarseGrainedSchedulerBackend =>
-        val requiredExecutors = numExecutors - b.numExistingExecutors
-        LOGGER.info(s"number of executors is =$numExecutors existing executors are =" +
-                s"${ b.numExistingExecutors }")
-        if (requiredExecutors > 0) {
-          b.requestExecutors(requiredExecutors)
-        }
-        true
-      case _ =>
-        false
-    }
+  def get: CarbonEnv = {
+    if (initialized) carbonEnv
+    else throw new RuntimeException("CarbonEnv not initialized")
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 6580c4f..3fe4f22 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -27,7 +27,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.LeafNode
-import org.apache.spark.sql.hive.CarbonMetastoreCatalog
+import org.apache.spark.sql.hive.CarbonMetastore
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
@@ -36,14 +36,14 @@ import org.apache.carbondata.spark.CarbonFilters
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
 case class CarbonScan(
-    var attributesRaw: Seq[Attribute],
+    var columnProjection: Seq[Attribute],
     relationRaw: CarbonRelation,
     dimensionPredicatesRaw: Seq[Expression],
     useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode {
   val carbonTable = relationRaw.metaData.carbonTable
   val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
   val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
-  @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
+  @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastore]
 
   val attributesNeedToDecode = new java.util.LinkedHashSet[AttributeReference]()
   val unprocessedExprs = new ArrayBuffer[Expression]()
@@ -79,21 +79,21 @@ case class CarbonScan(
 
   private def processExtraAttributes(plan: CarbonQueryPlan) {
     if (attributesNeedToDecode.size() > 0) {
-      val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
+      val attributeOut = new ArrayBuffer[Attribute]() ++ columnProjection
 
       attributesNeedToDecode.asScala.foreach { attr =>
-        if (!attributesRaw.exists(_.name.equalsIgnoreCase(attr.name))) {
+        if (!columnProjection.exists(_.name.equalsIgnoreCase(attr.name))) {
           attributeOut += attr
         }
       }
-      attributesRaw = attributeOut
+      columnProjection = attributeOut
     }
 
     val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
     val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
     val dimAttr = new Array[Attribute](dimensions.size())
     val msrAttr = new Array[Attribute](measures.size())
-    attributesRaw.foreach { attr =>
+    columnProjection.foreach { attr =>
       val carbonDimension =
         carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
       if(carbonDimension != null) {
@@ -107,10 +107,10 @@ case class CarbonScan(
       }
     }
 
-    attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
+    columnProjection = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
 
     var queryOrder: Integer = 0
-    attributesRaw.foreach { attr =>
+    columnProjection.foreach { attr =>
       val carbonDimension =
         carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
       if (carbonDimension != null) {
@@ -137,22 +137,11 @@ case class CarbonScan(
   }
 
   def inputRdd: CarbonScanRDD[Array[Any]] = {
-
-    val conf = new Configuration()
-    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-
-    // setting queryid
-    buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
-
-    val tableCreationTime = carbonCatalog
-        .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
-    val schemaLastUpdatedTime = carbonCatalog
-        .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
     new CarbonScanRDD(
       ocRaw.sparkContext,
-      attributesRaw,
+      columnProjection,
       buildCarbonPlan.getFilterExpression,
-      absoluteTableIdentifier,
+      carbonTable.getAbsoluteTableIdentifier,
       carbonTable
     )
   }
@@ -180,7 +169,7 @@ case class CarbonScan(
   }
 
   def output: Seq[Attribute] = {
-    attributesRaw
+    columnProjection
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
new file mode 100644
index 0000000..4320598
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
+
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonUtil
+
+case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
+
+object CarbonSparkUtil {
+
+  def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
+    val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
+        .asScala.map(x => x.getColName) // wf : may be problem
+    val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
+        .asScala.map(x => x.getColName)
+    val dictionary =
+      carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
+        (f.getColName.toLowerCase,
+            f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+                !CarbonUtil.hasComplexDataType(f.getDataType))
+      }
+    CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 62f1d4c..47fa82e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -428,7 +428,7 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
             throw new MalformedCarbonCommandException("Invalid table properties")
           }
           // prepare table model of the collected tokens
-          val tableModel: tableModel = prepareTableModel(ifNotExistPresent,
+          val tableModel: TableModel = prepareTableModel(ifNotExistPresent,
             dbName,
             tableName,
             fields,
@@ -502,7 +502,7 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
   protected def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
       , tableName: String, fields: Seq[Field],
       partitionCols: Seq[PartitionerField],
-      tableProperties: Map[String, String]): tableModel
+      tableProperties: Map[String, String]): TableModel
   = {
 
     val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
@@ -530,11 +530,10 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
     // get no inverted index columns from table properties.
     val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
 
-    val partitioner: Option[Partitioner] = getPartitionerObject(partitionCols, tableProperties)
     // validate the tableBlockSize from table properties
     CommonUtil.validateTableBlockSize(tableProperties)
 
-    tableModel(
+    TableModel(
       ifNotExistPresent,
       dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
       dbName,
@@ -544,7 +543,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
       msrs.map(f => normalizeType(f)),
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
-      partitioner,
       groupCols,
       Some(colProps))
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
deleted file mode 100644
index 7884f9d..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst
-
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-
-/**
- * Implicit functions for [TableIdentifier]
- */
-object CarbonTableIdentifierImplicit {
-  def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
-
-  implicit def toTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = {
-    tableIdentifier match {
-      case Seq(dbName, tableName) => TableIdentifier(tableName, Some(dbName))
-      case Seq(tableName) => TableIdentifier(tableName, None)
-      case _ => throw new NoSuchTableException
-    }
-  }
-
-  implicit def toSequence(tableIdentifier: TableIdentifier): Seq[String] = {
-    tableIdentifier.database match {
-      case Some(dbName) => Seq(dbName, tableIdentifier.table)
-      case _ => Seq(tableIdentifier.table)
-    }
-  }
-}