You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/06/30 17:42:29 UTC

[42/50] [abbrv] incubator-carbondata git commit: Merge remote-tracking branch 'carbon_master/master' into apache/master

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index bbc9f25,9f534c1..4d39eb2
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@@ -52,16 -51,14 +52,18 @@@ class CarbonSourc
    override def createRelation(
        sqlContext: SQLContext,
        parameters: Map[String, String]): BaseRelation = {
-     parameters.get("path") match {
-       case Some(path) => CarbonDatasourceHadoopRelation(sqlContext, Array(path), parameters)
-       case _ =>
-         val options = new CarbonOption(parameters)
-         val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq
-         val ident = tableIdentifier match {
-           case Seq(name) => TableIdentifier(name, None)
-           case Seq(db, name) => TableIdentifier(name, Some(db))
-         }
-         CarbonDatasourceRelation(ident, None)(sqlContext)
+     if (parameters.get("tablePath") != None) {
+       val options = new CarbonOption(parameters)
+       val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq
 -      CarbonDatasourceRelation(tableIdentifier, None)(sqlContext)
++      val ident = tableIdentifier match {
++        case Seq(name) => TableIdentifier(name, None)
++        case Seq(db, name) => TableIdentifier(name, Some(db))
++      }
++      CarbonDatasourceRelation(ident, None)(sqlContext)
+     } else if (parameters.get("path") != None) {
+       CarbonDatasourceHadoopRelation(sqlContext, Array(parameters.get("path").get), parameters)
+     } else {
+       sys.error("Carbon table path not found")
      }
  
    }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 05149eb,ce43c4f..8a9f1c9
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@@ -29,11 -29,10 +29,11 @@@ import org.apache.spark.unsafe.types.UT
  
  import org.carbondata.core.cache.{Cache, CacheProvider, CacheType}
  import org.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
- import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+ import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier, ColumnIdentifier}
  import org.carbondata.core.carbon.metadata.datatype.DataType
  import org.carbondata.core.carbon.metadata.encoder.Encoding
 -import org.carbondata.query.carbon.util.DataTypeUtil
 +import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 +import org.carbondata.scan.util.DataTypeUtil
  
  /**
   * It decodes the data.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 4209c5a,b340884..406b025
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@@ -1325,13 -1186,12 +1186,13 @@@ class CarbonSqlParser(
      }
  
    protected lazy val segmentId: Parser[String] =
 -    ( numericLit ^^ { u => u } |
 -      elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
 -      )
 +    numericLit ^^ { u => u } |
 +      elem("decimal", p => {
 +        p.getClass.getSimpleName.equals("FloatLit") ||
 +        p.getClass.getSimpleName.equals("DecimalLit") } ) ^^ (_.chars)
  
    protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
-     DELETE ~> (LOAD|SEGMENT) ~> repsep(segmentId, ",") ~ (FROM ~> (CUBE | TABLE) ~>
+     DELETE ~> (LOAD|SEGMENT) ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
        (ident <~ ".").? ~ ident) <~
        opt(";") ^^ {
        case loadids ~ cube => cube match {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index cb94f72,75abe0e..b8afcdf
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@@ -29,8 -29,10 +29,11 @@@ import scala.util.Rando
  
  import org.apache.spark.SparkEnv
  import org.apache.spark.sql._
 +import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
 +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+ import org.apache.spark.sql.catalyst.util.DateTimeUtils
+ import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
  import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
  import org.apache.spark.sql.hive.HiveContext
  import org.apache.spark.sql.types.TimestampType
@@@ -1244,7 -1269,8 +1266,8 @@@ private[sql] case class CreateCube(cm: 
        try {
          sqlContext.sql(
            s"""CREATE TABLE $dbName.$tbName USING org.apache.spark.sql.CarbonSource""" +
-           s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$cubePath") """).collect
 -          s""" OPTIONS (cubename "$dbName.$tbName", tablePath "$tablePath") """)
++          s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$tablePath") """)
+               .collect
        } catch {
          case e: Exception =>
  
@@@ -1290,21 -1317,23 +1314,21 @@@ private[sql] case class DeleteLoadsById
  
      // validate load ids first
      validateLoadIds
 +    val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
  
 +    val identifier = TableIdentifier(tableName, Option(dbName))
      val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
 -      Option(schemaName),
 -      tableName,
 -      None)(sqlContext).asInstanceOf[CarbonRelation]
 +      identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
      if (relation == null) {
-       LOGGER.audit(s"The delete load by Id is failed. Table $dbName.$tableName does not exist")
 -      LOGGER.audit(s"Delete load by Id is failed. Table $schemaName.$tableName does not exist")
 -      sys.error(s"Table $schemaName.$tableName does not exist")
++      LOGGER.audit(s"Delete load by Id is failed. Table $dbName.$tableName does not exist")
 +      sys.error(s"Table $dbName.$tableName does not exist")
      }
  
 -    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(schemaName + '_' + tableName)
 +    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
  
      if (null == carbonTable) {
 -      CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
 -        Option(schemaName),
 -        tableName,
 -        None)(sqlContext).asInstanceOf[CarbonRelation]
 +      CarbonEnv.getInstance(sqlContext).carbonCatalog
 +        .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
      }
      val path = carbonTable.getMetaDataFilepath
  
@@@ -1356,20 -1375,29 +1370,26 @@@ private[sql] case class DeleteLoadsByLo
  
    def run(sqlContext: SQLContext): Seq[Row] = {
  
 -    val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
 -    LOGGER.audit(s"Delete load by load date request has been received for $schemaName.$tableName")
 -
 -    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
 -      Option(schemaName),
 -      tableName,
 -     None
 -    )(sqlContext).asInstanceOf[CarbonRelation]
 +    LOGGER.audit("The delete load by load date request has been received.")
 +    val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
 +    val identifier = TableIdentifier(tableName, Option(dbName))
 +    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
 +      .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
      if (relation == null) {
        LOGGER
-         .audit(s"The delete load by load date is failed. Table $dbName.$tableName does not " +
 -        .audit(s"Delete load by load date is failed. Table $schemaName.$tableName does not " +
++        .audit(s"Delete load by load date is failed. Table $dbName.$tableName does not " +
           s"exist")
 -      sys.error(s"Table $schemaName.$tableName does not exist")
 +      sys.error(s"Table $dbName.$tableName does not exist")
      }
  
+     val timeObj = Cast(Literal(loadDate), TimestampType).eval()
+     if(null == timeObj) {
+       val errorMessage = "Error: Invalid load start time format " + loadDate
+       throw new MalformedCarbonCommandException(errorMessage)
+     }
+ 
      var carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
 -      .getCarbonTable(schemaName + '_' + tableName)
 +      .getCarbonTable(dbName + '_' + tableName)
      var segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
  
      if (null == carbonTable) {
@@@ -1378,9 -1409,14 +1398,14 @@@
      }
      var path = carbonTable.getMetaDataFilepath()
  
- 
-     var invalidLoadTimestamps = segmentStatusManager.updateDeletionStatus(loadDate, path).asScala
-     LOGGER.audit("The delete load by Id is successfull.")
+     var invalidLoadTimestamps = segmentStatusManager
+       .updateDeletionStatus(loadDate, path, timeObj.asInstanceOf[java.lang.Long]).asScala
+     if(invalidLoadTimestamps.isEmpty) {
 -      LOGGER.audit(s"Delete load by load date is successfull for $schemaName.$tableName.")
++      LOGGER.audit(s"Delete load by load date is successfull for $dbName.$tableName.")
+     }
+     else {
+       sys.error("Delete load by load date is failed. No matching load found.")
+     }
      Seq.empty
  
    }
@@@ -1742,9 -1818,7 +1768,7 @@@ private[sql] case class DropTableComman
          if (carbonLock.lockWithRetries()) {
            logInfo("Successfully able to get the table metadata file lock")
          } else {
-           LOGGER.audit(
-             s"Dropping table with Database name [$dbName] and Table name [$tableName] " +
-             s"failed as the Table is locked")
 -          LOGGER.audit(s"Dropping table $schemaName.$cubeName failed as the Table is locked")
++          LOGGER.audit(s"Dropping table $dbName.$tableName failed as the Table is locked")
            sys.error("Table is locked for updation. Please try after some time")
          }
  
@@@ -1877,7 -1976,9 +1901,9 @@@ private[sql] case class DescribeCommand
  
    override def run(sqlContext: SQLContext): Seq[Row] = {
      val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
 -      .lookupRelation2(tblIdentifier, None)(sqlContext).asInstanceOf[CarbonRelation]
 +      .lookupRelation1(tblIdentifier)(sqlContext).asInstanceOf[CarbonRelation]
+     val mapper = new ObjectMapper()
+     val colProps = StringBuilder.newBuilder
      var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
        val comment = if (relation.metaData.dims.contains(field.name)) {
          val dimension = relation.metaData.carbonTable.getDimensionByName(
@@@ -1966,17 -2085,16 +2010,17 @@@ private[sql] case class DeleteLoadByDat
  
    def run(sqlContext: SQLContext): Seq[Row] = {
  
-     LOGGER.audit("The delete load by date request has been received.")
 -    val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
 -    LOGGER.audit(s"The delete load by date request has been received for $schemaName.$cubeName")
 +    val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
++    LOGGER.audit(s"The delete load by date request has been received for $dbName.$cubeName")
 +    val identifier = TableIdentifier(cubeName, Option(dbName))
      val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
 -      .lookupRelation1(Some(schemaName), cubeName, None)(sqlContext).asInstanceOf[CarbonRelation]
 +      .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
      var level: String = ""
      var carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata
 -         .getInstance().getCarbonTable(schemaName + '_' + cubeName)
 +         .getInstance().getCarbonTable(dbName + '_' + cubeName)
      if (relation == null) {
 -      LOGGER.audit(s"The delete load by date is failed. Table $schemaName.$cubeName does not exist")
 -      sys.error(s"Table $schemaName.$cubeName does not exist")
 +      LOGGER.audit(s"The delete load by date is failed. Table $dbName.$cubeName does not exist")
 +      sys.error(s"Table $dbName.$cubeName does not exist")
      }
  
      val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
@@@ -1986,8 -2104,8 +2030,8 @@@
      if (matches.isEmpty) {
        LOGGER.audit(
          "The delete load by date is failed. " +
-         "Table $dbName.$cubeName does not contain date field " + dateField)
 -        s"Table $schemaName.$cubeName does not contain date field :" + dateField)
 -      sys.error(s"Table $schemaName.$cubeName does not contain date field " + dateField)
++        "Table $dbName.$cubeName does not contain date field :" + dateField)
 +      sys.error(s"Table $dbName.$cubeName does not contain date field " + dateField)
      }
      else {
        level = matches.asJava.get(0).name
@@@ -2007,7 -2125,7 +2051,7 @@@
        actualColName,
        dateValue,
        relation.cubeMeta.partitioner)
-     LOGGER.audit("The delete load by date is successfull.")
 -    LOGGER.audit(s"The delete load by date $dateValue is successful for $schemaName.$cubeName.")
++    LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$cubeName.")
      Seq.empty
    }
  }
@@@ -2020,15 -2138,14 +2064,15 @@@ private[sql] case class CleanFiles
    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
  
    def run(sqlContext: SQLContext): Seq[Row] = {
-     LOGGER.audit("The clean files request has been received.")
 -    val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
 -    LOGGER.audit(s"Clean files request has been received for $schemaName.$cubeName")
 +    val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
++    LOGGER.audit(s"The clean files request has been received for $dbName.$cubeName")
 +    val identifier = TableIdentifier(cubeName, Option(dbName))
      val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
 -      .lookupRelation1(Some(schemaName), cubeName, None)(sqlContext).
 +      .lookupRelation1(identifier)(sqlContext).
        asInstanceOf[CarbonRelation]
      if (relation == null) {
 -      LOGGER.audit(s"Clean files request is failed. Table $schemaName.$cubeName does not exist")
 -      sys.error(s"Table $schemaName.$cubeName does not exist")
 +      LOGGER.audit(s"The clean files request is failed. Table $dbName.$cubeName does not exist")
 +      sys.error(s"Table $dbName.$cubeName does not exist")
      }
  
      val carbonLoadModel = new CarbonLoadModel()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index d87e132,f88b74e..aeae761
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@@ -113,14 -112,30 +113,6 @@@ class CarbonMetastoreCatalog(hive: Hive
  
    val metadata = loadMetadata(storePath)
  
-   lazy val useUniquePath = if ("true".equalsIgnoreCase(CarbonProperties.getInstance().
-     getProperty(
-       CarbonCommonConstants.CARBON_UNIFIED_STORE_PATH,
-       CarbonCommonConstants.CARBON_UNIFIED_STORE_PATH_DEFAULT))) {
-     true
-   } else {
-     false
 -  def lookupRelation1(
 -      databaseName: Option[String],
 -      tableName: String,
 -      alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = {
 -    val db = databaseName match {
 -      case Some(name) => name
 -      case _ => null
 -    }
 -    if (db == null) {
 -      lookupRelation2(Seq(tableName), alias)(sqlContext)
 -    } else {
 -      lookupRelation2(Seq(db, tableName), alias)(sqlContext)
 -    }
 -  }
 -
 -  override def lookupRelation(tableIdentifier: Seq[String],
 -      alias: Option[String] = None): LogicalPlan = {
 -    try {
 -      super.lookupRelation(tableIdentifier, alias)
 -    } catch {
 -      case s: java.lang.Exception =>
 -        lookupRelation2(tableIdentifier, alias)(hive.asInstanceOf[SQLContext])
 -    }
--  }
  
    def getCubeCreationTime(schemaName: String, cubeName: String): Long = {
      val cubeMeta = metadata.cubesMeta.filter(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
index 0de2d1a,0000000..e7f131d
mode 100644,000000..100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
@@@ -1,218 -1,0 +1,221 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.carbondata.spark.rdd
 +
 +import java.util
 +
 +import scala.collection.JavaConverters._
 +import scala.reflect.ClassTag
 +
 +import org.apache.hadoop.conf.Configuration
 +import org.apache.hadoop.mapreduce.Job
 +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 +import org.apache.spark.rdd.RDD
 +
 +import org.carbondata.common.CarbonIterator
 +import org.carbondata.common.logging.LogServiceFactory
 +import org.carbondata.core.carbon.datastore.block.TableBlockInfo
 +import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
 +import org.carbondata.scan.executor.QueryExecutorFactory
 +import org.carbondata.scan.expression.Expression
 +import org.carbondata.scan.model.QueryModel
 +import org.carbondata.scan.result.BatchResult
 +import org.carbondata.scan.result.iterator.ChunkRowIterator
 +import org.carbondata.spark.RawValue
 +import org.carbondata.spark.load.CarbonLoaderUtil
 +import org.carbondata.spark.util.QueryPlanUtil
 +
 +class CarbonSparkPartition(rddId: Int, val idx: Int,
 +  val locations: Array[String],
 +  val tableBlockInfos: util.List[TableBlockInfo])
 +  extends Partition {
 +
 +  override val index: Int = idx
 +
 +  // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations)
 +  override def hashCode(): Int = {
 +    41 * (41 + rddId) + idx
 +  }
 +}
 +
 + /**
 +  * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
 +  * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
 +  * level filtering in driver side.
 +  */
 +class CarbonScanRDD[V: ClassTag](
 +  sc: SparkContext,
 +  queryModel: QueryModel,
 +  filterExpression: Expression,
 +  keyClass: RawValue[V],
 +  @transient conf: Configuration,
 +  cubeCreationTime: Long,
 +  schemaLastUpdatedTime: Long,
 +  baseStoreLocation: String)
 +  extends RDD[V](sc, Nil) with Logging {
 +
 +  val defaultParallelism = sc.defaultParallelism
 +
 +  override def getPartitions: Array[Partition] = {
 +    val startTime = System.currentTimeMillis()
 +    val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
 +      QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
 +
 +    val result = new util.ArrayList[Partition](defaultParallelism)
 +    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 +    // set filter resolver tree
 +    try {
-       val filterResolver = carbonInputFormat
-         .getResolvedFilter(job.getConfiguration, filterExpression)
- 
-       CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
-       queryModel.setFilterExpressionResolverTree(filterResolver)
++      // before applying filter check whether segments are available in the table.
++      val splits = carbonInputFormat.getSplits(job)
++      if (!splits.isEmpty) {
++        var filterResolver = carbonInputFormat
++          .getResolvedFilter(job.getConfiguration, filterExpression)
++        CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
++        queryModel.setFilterExpressionResolverTree(filterResolver)
++      }
 +    }
 +    catch {
 +      case e: Exception =>
 +        LOGGER.error(e)
 +        sys.error("Exception occurred in query execution :: " + e.getMessage)
 +    }
 +    // get splits
 +    val splits = carbonInputFormat.getSplits(job)
 +    if (!splits.isEmpty) {
 +      val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
 +
 +      val blockList = carbonInputSplits.map(inputSplit =>
 +        new TableBlockInfo(inputSplit.getPath.toString,
 +          inputSplit.getStart, inputSplit.getSegmentId,
 +          inputSplit.getLocations, inputSplit.getLength
 +        )
 +      )
 +      if (blockList.nonEmpty) {
 +        // group blocks to nodes, tasks
 +        val nodeBlockMapping =
 +          CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism)
 +
 +        var i = 0
 +        // Create Spark Partition for each task and assign blocks
 +        nodeBlockMapping.asScala.foreach { entry =>
 +          entry._2.asScala.foreach { blocksPerTask =>
 +            if (blocksPerTask.size() != 0) {
 +              result.add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, blocksPerTask))
 +              i += 1
 +            }
 +          }
 +        }
 +        val noOfBlocks = blockList.size
 +        val noOfNodes = nodeBlockMapping.size
 +        val noOfTasks = result.size()
 +        logInfo(s"Identified  no.of.Blocks: $noOfBlocks,"
 +          + s"parallelism: $defaultParallelism , " +
 +          s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
 +        )
 +        logInfo("Time taken to identify Blocks to scan : " +
 +          (System.currentTimeMillis() - startTime)
 +        )
 +        result.asScala.foreach { r =>
 +          val cp = r.asInstanceOf[CarbonSparkPartition]
 +          logInfo(s"Node : " + cp.locations.toSeq.mkString(",")
 +            + ", No.Of Blocks : " + cp.tableBlockInfos.size()
 +          )
 +        }
 +      } else {
 +        logInfo("No blocks identified to scan")
 +        val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
 +        result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
 +      }
 +    }
 +    else {
 +      logInfo("No valid segments found to scan")
 +      val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
 +      result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
 +    }
 +    result.toArray(new Array[Partition](result.size()))
 +  }
 +
 +   override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
 +     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 +     val iter = new Iterator[V] {
 +       var rowIterator: CarbonIterator[Array[Any]] = _
 +       var queryStartTime: Long = 0
 +       try {
 +         val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
 +         if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
 +           queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
 +           // fill table block info
 +           queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
 +           queryStartTime = System.currentTimeMillis
 +
 +           val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
 +           logInfo("*************************" + carbonPropertiesFilePath)
 +           if (null == carbonPropertiesFilePath) {
 +             System.setProperty("carbon.properties.filepath",
 +               System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
 +           }
 +           // execute query
 +           rowIterator = new ChunkRowIterator(
 +             QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel).
 +               asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
 +
 +         }
 +       } catch {
 +         case e: Exception =>
 +           LOGGER.error(e)
 +           if (null != e.getMessage) {
 +             sys.error("Exception occurred in query execution :: " + e.getMessage)
 +           } else {
 +             sys.error("Exception occurred in query execution.Please check logs.")
 +           }
 +       }
 +
 +       var havePair = false
 +       var finished = false
 +
 +       override def hasNext: Boolean = {
 +         if (!finished && !havePair) {
 +           finished = (null == rowIterator) || (!rowIterator.hasNext)
 +           havePair = !finished
 +         }
 +         !finished
 +       }
 +
 +       override def next(): V = {
 +         if (!hasNext) {
 +           throw new java.util.NoSuchElementException("End of stream")
 +         }
 +         havePair = false
 +         keyClass.getValue(rowIterator.next())
 +       }
 +
 +       logInfo("********************** Total Time Taken to execute the query in Carbon Side: " +
 +           (System.currentTimeMillis - queryStartTime)
 +       )
 +     }
 +     iter
 +   }
 +
 +   /**
 +    * Get the preferred locations where to launch this task.
 +    */
 +  override def getPreferredLocations(partition: Partition): Seq[String] = {
 +    val theSplit = partition.asInstanceOf[CarbonSparkPartition]
 +    theSplit.locations.filter(_ != "localhost")
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index b6bda0c,4a02975..d13a9df
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@@ -40,19 -40,15 +40,19 @@@ class AllDataTypesTestCaseAggregate ext
      val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
        .getCanonicalPath
  
-     sql("create cube Carbon_automation_test dimensions(imei string,deviceInformationId integer,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active
 _phonePADPartitionedVersions string, Latest_YEAR integer, Latest_MONTH integer, Latest_DAY integer, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string)  measures(gamePointId integer,contractNumber integer) OPTIONS (PARTITIONER [CLASS = 'org.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl' ,COLUMNS= (imei) , PARTITION_COUNT=2] )");
+     sql("create table Carbon_automation_test (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPart
 itionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_EXCLUDE'='Latest_MONTH,Latest_DAY,deviceInformationId')");
      CarbonProperties.getInstance()
        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-     sql("LOAD DATA FACT FROM'"+currentDirectory+"/src/test/resources/100_olap.csv' INTO Cube Carbon_automation_test partitionData(DELIMITER ',' ,QUOTECHAR '\"', FILEHEADER 'imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUI
 Version,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')");
+     sql("LOAD DATA LOCAL INPATH '"+currentDirectory+"/src/test/resources/100_olap.csv' INTO table Carbon_automation_test options('DELIMITER'= ',' ,'QUOTECHAR'= '\"', 'FILEHEADER'= 'imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Lat
 est_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')");
 +    sql("create table if not exists Carbon_automation_hive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Activ
 e_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) row format delimited fields terminated by ','");
 +    sql("LOAD DATA LOCAL INPATH '"+currentDirectory+"/src/test/resources/100_olap.csv' INTO table Carbon_automation_hive ");
  
    }
  
    override def afterAll {
-     sql("drop cube Carbon_automation_test")
-     sql("drop cube Carbon_automation_hive")
+     sql("drop table Carbon_automation_test")
++    sql("drop table Carbon_automation_hive")
 +
      CarbonProperties.getInstance()
        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
    }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
index 0a0fe62,0000000..a1cf40a
mode 100644,000000..100644
--- a/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
+++ b/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
@@@ -1,162 -1,0 +1,162 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.lcm.locks;
 +
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.nio.channels.FileChannel;
 +import java.nio.channels.FileLock;
 +import java.nio.channels.OverlappingFileLockException;
 +
 +import org.carbondata.common.logging.LogService;
 +import org.carbondata.common.logging.LogServiceFactory;
 +import org.carbondata.core.datastorage.store.impl.FileFactory;
 +
 +/**
 + * This class handles the file locking in the local file system.
 + * This will be handled using the file channel lock API.
 + */
 +public class LocalFileLock extends AbstractCarbonLock {
 +  /**
 +   * location is the location of the lock file.
 +   */
 +  private String location;
 +
 +  /**
 +   * lockUsage will determine the lock folder. so that similar locks will try to acquire
 +   * same lock file.
 +   */
 +  private LockUsage lockUsage;
 +
 +  /**
 +   * fileOutputStream of the local lock file
 +   */
 +  private FileOutputStream fileOutputStream;
 +
 +  /**
 +   * channel is the FileChannel of the lock file.
 +   */
 +  private FileChannel channel;
 +
 +  /**
 +   * fileLock NIO FileLock Object
 +   */
 +  private FileLock fileLock;
 +
 +  public static final String tmpPath;
 +
 +  private String cubeName;
 +
 +  private String schemaName;
 +
 +  /**
 +   * LOGGER for  logging the messages.
 +   */
 +  private static final LogService LOGGER =
 +      LogServiceFactory.getLogService(LocalFileLock.class.getName());
 +
 +  static {
 +    tmpPath = System.getProperty("java.io.tmpdir");
 +  }
 +
 +  /**
 +   * @param location
 +   * @param lockUsage
 +   */
 +  public LocalFileLock(String location, LockUsage lockUsage) {
 +    this.lockUsage = lockUsage;
 +    location = location.replace("\\", "/");
 +    String tempStr = location.substring(0, location.lastIndexOf('/'));
++    cubeName = tempStr.substring(tempStr.lastIndexOf('/') + 1, tempStr.length());
++    tempStr = tempStr.substring(0, tempStr.lastIndexOf('/'));
 +    schemaName = tempStr.substring(tempStr.lastIndexOf('/') + 1, tempStr.length());
- 
-     cubeName = location.substring(location.lastIndexOf('/') + 1, location.length());
 +    this.location =
 +        tmpPath + File.separator + schemaName + File.separator + cubeName + File.separator
 +            + this.lockUsage;
 +    initRetry();
 +  }
 +
 +  /**
 +   * Lock API for locking of the file channel of the lock file.
 +   *
 +   * @return
 +   */
 +  @Override public boolean lock() {
 +    try {
 +      String schemaFolderPath = tmpPath + File.separator + schemaName;
 +      String cubeFolderPath = schemaFolderPath + File.separator + cubeName;
 +      // create dir with schema name in tmp location.
 +      if (!FileFactory.isFileExist(schemaFolderPath, FileFactory.getFileType(tmpPath))) {
 +        FileFactory.mkdirs(schemaFolderPath, FileFactory.getFileType(tmpPath));
 +      }
 +
 +      // create dir with cube name in tmp location.
 +      if (!FileFactory.isFileExist(cubeFolderPath, FileFactory.getFileType(tmpPath))) {
 +        FileFactory.mkdirs(cubeFolderPath, FileFactory.getFileType(tmpPath));
 +      }
 +      if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) {
 +        FileFactory.createNewLockFile(location, FileFactory.getFileType(location));
 +      }
 +
 +      fileOutputStream = new FileOutputStream(location);
 +      channel = fileOutputStream.getChannel();
 +      try {
 +        fileLock = channel.tryLock();
 +      } catch (OverlappingFileLockException e) {
 +        return false;
 +      }
 +      if (null != fileLock) {
 +        return true;
 +      } else {
 +        return false;
 +      }
 +    } catch (IOException e) {
 +      return false;
 +    }
 +
 +  }
 +
 +  /**
 +   * Unlock API for unlocking of the acquired lock.
 +   *
 +   * @return
 +   */
 +  @Override public boolean unlock() {
 +    boolean status;
 +    try {
 +      if (null != fileLock) {
 +        fileLock.release();
 +      }
 +      status = true;
 +    } catch (IOException e) {
 +      status = false;
 +    } finally {
 +      if (null != fileOutputStream) {
 +        try {
 +          fileOutputStream.close();
 +        } catch (IOException e) {
 +          LOGGER.error(e.getMessage());
 +        }
 +      }
 +    }
 +    return status;
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/lcm/locks/LockUsage.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/lcm/locks/LockUsage.java
index e3eab01,0000000..3b03b1f
mode 100644,000000..100644
--- a/processing/src/main/java/org/carbondata/lcm/locks/LockUsage.java
+++ b/processing/src/main/java/org/carbondata/lcm/locks/LockUsage.java
@@@ -1,29 -1,0 +1,30 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.lcm.locks;
 +
 +/**
 + * This enum is used to define the usecase of the lock.
 + * Each enum value is one specific lock case.
 + */
 +public enum LockUsage {
 +  METADATA_LOCK,
-   COMPACTION_LOCK;
++  COMPACTION_LOCK,
++  TABLE_STATUS_LOCK;
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
index dcd5e19,473840b..3dd64be
--- a/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
@@@ -37,9 -39,10 +39,8 @@@ import org.carbondata.core.carbon.metad
  import org.carbondata.core.carbon.path.CarbonStorePath;
  import org.carbondata.core.carbon.path.CarbonTablePath;
  import org.carbondata.core.constants.CarbonCommonConstants;
 -import org.carbondata.core.file.manager.composite.FileData;
 -import org.carbondata.core.file.manager.composite.FileManager;
 -import org.carbondata.core.file.manager.composite.IFileManagerComposite;
 +import org.carbondata.core.datastorage.store.columnar.ColumnGroupModel;
  import org.carbondata.core.keygenerator.KeyGenException;
- import org.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
  import org.carbondata.core.util.CarbonProperties;
  import org.carbondata.core.util.CarbonUtil;
  import org.carbondata.core.util.CarbonUtilException;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
----------------------------------------------------------------------