You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:41 UTC

[43/47] incubator-carbondata git commit: Merge remote-tracking branch 'carbon_master/master' into apache/master

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 74239a3,5c25406..22b6021
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@@ -31,7 -32,8 +32,8 @@@ import org.apache.spark.sql.catalyst.{S
  import org.apache.spark.sql.catalyst.analysis._
  import org.apache.spark.sql.catalyst.plans.logical._
  import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+ import org.apache.spark.sql.execution.ExplainCommand
 -import org.apache.spark.sql.execution.command.{DimensionRelation, _}
 +import org.apache.spark.sql.execution.command._
  import org.apache.spark.sql.execution.datasources.DescribeCommand
  import org.apache.spark.sql.hive.HiveQlWrapper
  
@@@ -332,109 -340,122 +340,123 @@@ class CarbonSqlParser(
        // if create table taken is found then only we will handle.
        case Token("TOK_CREATETABLE", children) =>
  
-         var fields: Seq[Field] = Seq[Field]()
-         var tableComment: String = ""
-         var tableProperties = Map[String, String]()
-         var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
-         var likeTableName: String = ""
-         var storedBy: String = ""
-         var ifNotExistPresent: Boolean = false
-         var dbName: Option[String] = None
-         var tableName: String = ""
  
-         children.collect {
-           // collecting all the field  list
-           case list@Token("TOK_TABCOLLIST", _) =>
-             val cols = BaseSemanticAnalyzer.getColumns(list, true)
-             if (cols != null) {
-               val dupColsGrp = cols.asScala
-                                  .groupBy(x => x.getName) filter { case (_, colList) => colList
-                                                                                           .size > 1
-                                }
-               if (dupColsGrp.size > 0) {
-                 var columnName: String = ""
-                 dupColsGrp.toSeq.foreach(columnName += _._1 + ", ")
-                 columnName = columnName.substring(0, columnName.lastIndexOf(", "))
-                 val errorMessage = "Duplicate column name: " + columnName + " found in table " +
-                                    ".Please check create table statement."
-                 throw new MalformedCarbonCommandException(errorMessage)
-               }
-               cols.asScala.map { col =>
-                 val columnName = col.getName()
-                 val dataType = Option(col.getType)
-                 val name = Option(col.getName())
-                 // This is to parse complex data types
-                 val x = col.getName + ' ' + col.getType
-                 val f: Field = anyFieldDef(new lexical.Scanner(x))
-                 match {
-                   case Success(field, _) => field
-                   case failureOrError => new Field(columnName, dataType, name, None, null,
-                     Some("columnar"))
+           var fields: Seq[Field] = Seq[Field]()
+           var tableComment: String = ""
+           var tableProperties = Map[String, String]()
+           var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
+           var likeTableName: String = ""
+           var storedBy: String = ""
+           var ifNotExistPresent: Boolean = false
+           var dbName: Option[String] = None
+           var tableName: String = ""
+ 
+           try {
+ 
+           children.collect {
+             // collecting all the field  list
+             case list@Token("TOK_TABCOLLIST", _) =>
+               val cols = BaseSemanticAnalyzer.getColumns(list, true)
+               if (cols != null) {
+                 val dupColsGrp = cols.asScala.groupBy(x => x.getName) filter {
+                   case (_, colList) => colList.size > 1
+                 }
+                 if (dupColsGrp.size > 0) {
+                   var columnName: String = ""
+                   dupColsGrp.toSeq.foreach(columnName += _._1 + ", ")
+                   columnName = columnName.substring(0, columnName.lastIndexOf(", "))
+                   val errorMessage = "Duplicate column name: " + columnName + " found in table " +
+                                      ".Please check create table statement."
+                   throw new MalformedCarbonCommandException(errorMessage)
                  }
-                 // the data type of the decimal type will be like decimal(10,0)
-                 // so checking the start of the string and taking the precision and scale.
-                 // resetting the data type with decimal
-                 if (f.dataType.getOrElse("").startsWith("decimal")) {
-                   val (precision, scale) = getScaleAndPrecision(col.getType)
-                   f.precision = precision
-                   f.scale = scale
-                   f.dataType = Some("decimal")
+                 cols.asScala.map { col =>
+                   val columnName = col.getName()
+                   val dataType = Option(col.getType)
+                   val name = Option(col.getName())
+                   // This is to parse complex data types
+                   val x = col.getName + ' ' + col.getType
+                   val f: Field = anyFieldDef(new lexical.Scanner(x))
+                   match {
+                     case Success(field, _) => field
+                     case failureOrError => new Field(columnName, dataType, name, None, null,
+                       Some("columnar"))
+                   }
+                   // the data type of the decimal type will be like decimal(10,0)
+                   // so checking the start of the string and taking the precision and scale.
+                   // resetting the data type with decimal
+                   if (f.dataType.getOrElse("").startsWith("decimal")) {
+                     val (precision, scale) = getScaleAndPrecision(col.getType)
+                     f.precision = precision
+                     f.scale = scale
+                     f.dataType = Some("decimal")
+                   }
+                   fields ++= Seq(f)
                  }
-                 fields ++= Seq(f)
                }
-             }
  
-           case Token("TOK_IFNOTEXISTS", _) =>
-             ifNotExistPresent = true
- 
-           case t@Token("TOK_TABNAME", _) =>
-             val (db, tblName) = extractDbNameTableName(t)
-             dbName = db
-             tableName = tblName.toLowerCase()
- 
-           case Token("TOK_TABLECOMMENT", child :: Nil) =>
-             tableComment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
- 
-           case Token("TOK_TABLEPARTCOLS", list@Token("TOK_TABCOLLIST", _) :: Nil) =>
-             val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
-             if (cols != null) {
-               cols.asScala.map { col =>
-                 val columnName = col.getName()
-                 val dataType = Option(col.getType)
-                 val comment = col.getComment
-                 val partitionCol = new PartitionerField(columnName, dataType, comment)
-                 partitionCols ++= Seq(partitionCol)
+             case Token("TOK_IFNOTEXISTS", _) =>
+               ifNotExistPresent = true
+ 
+             case t@Token("TOK_TABNAME", _) =>
+               val (db, tblName) = extractDbNameTableName(t)
+               dbName = db
+               tableName = tblName.toLowerCase()
+ 
+             case Token("TOK_TABLECOMMENT", child :: Nil) =>
+               tableComment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+ 
+             case Token("TOK_TABLEPARTCOLS", list@Token("TOK_TABCOLLIST", _) :: Nil) =>
+               val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
+               if (cols != null) {
+                 cols.asScala.map { col =>
+                   val columnName = col.getName()
+                   val dataType = Option(col.getType)
+                   val comment = col.getComment
+                   val partitionCol = new PartitionerField(columnName, dataType, comment)
+                   partitionCols ++= Seq(partitionCol)
+                 }
                }
-             }
-           case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
-             tableProperties ++= getProperties(list)
+             case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
+               tableProperties ++= getProperties(list)
  
-           case Token("TOK_LIKETABLE", child :: Nil) =>
-             likeTableName = child.getChild(0).getText()
+             case Token("TOK_LIKETABLE", child :: Nil) =>
+               likeTableName = child.getChild(0).getText()
  
-           case Token("TOK_STORAGEHANDLER", child :: Nil) =>
-             storedBy = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+             case Token("TOK_STORAGEHANDLER", child :: Nil) =>
+               storedBy = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
  
-           case _ => // Unsupport features
-         }
+             case _ => // Unsupport features
+           }
  
 -          if (!storedBy.equals(CarbonContext.datasourceName)) {
 -            // TODO: should execute by Hive instead of error
 -            sys.error("Not a carbon format request")
 -          }
 +        if (!(storedBy.equals(CarbonContext.datasourceName) ||
 +              storedBy.equals(CarbonContext.datasourceShortName))) {
 +          // TODO: should execute by Hive instead of error
 +          sys.error("Not a carbon format request")
 +        }
  
-       // validate tblProperties
-       if (!CommonUtil.validateTblProperties(tableProperties, fields)) {
-         throw new MalformedCarbonCommandException("Invalid table properties")
-       }
-       // prepare table model of the collected tokens
-       val tableModel: tableModel = prepareTableModel(ifNotExistPresent, dbName, tableName, fields,
-         partitionCols,
-         tableProperties)
- 
-         // get logical plan.
-         CreateTable(tableModel)
+           // validate tblProperties
+           if (!CommonUtil.validateTblProperties(tableProperties, fields)) {
+             throw new MalformedCarbonCommandException("Invalid table properties")
+           }
+           // prepare table model of the collected tokens
+           val tableModel: tableModel = prepareTableModel(ifNotExistPresent,
+             dbName,
+             tableName,
+             fields,
+             partitionCols,
+             tableProperties)
+ 
+           // get logical plan.
+           CreateTable(tableModel)
+         }
+         catch {
+           case ce: MalformedCarbonCommandException =>
+             val message = if (tableName.isEmpty) "Create table command failed. "
+             else if (!dbName.isDefined) s"Create table command failed for $tableName. "
+             else s"Create table command failed for ${dbName.get}.$tableName. "
+             LOGGER.audit(message + ce.getMessage)
+             throw ce
+         }
  
      }
    }
@@@ -1295,7 -1343,16 +1324,16 @@@
  
    protected lazy val cleanFiles: Parser[LogicalPlan] =
      CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
 -      case schemaName ~ cubeName => CleanFiles(schemaName, cubeName.toLowerCase())
 +      case databaseName ~ tableName => CleanFiles(databaseName, tableName.toLowerCase())
      }
  
+   protected lazy val explainPlan: Parser[LogicalPlan] =
+     (EXPLAIN ~> opt(EXTENDED)) ~ startCommand ^^ {
+       case isExtended ~ logicalPlan =>
+         logicalPlan match {
+           case plan: CreateTable => ExplainCommand(logicalPlan, extended = isExtended.isDefined)
+           case _ => ExplainCommand(OneRowRelation)
+       }
+     }
+ 
  }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index e8f6e11,1162db4..9a16f77
--- a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@@ -24,15 -25,17 +25,17 @@@ import org.apache.spark.sql.catalyst.In
  import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericMutableRow}
  
  import org.carbondata.core.carbon.metadata.encoder.Encoding
 -import org.carbondata.query.carbonfilterinterface.{ExpressionType, RowIntf}
 -import org.carbondata.query.expression.{ColumnExpression, ExpressionResult, UnknownExpression}
 -import org.carbondata.query.expression.conditional.ConditionalExpression
 -import org.carbondata.query.expression.exception.FilterUnsupportedException
 +import org.carbondata.scan.expression.{ColumnExpression, ExpressionResult, UnknownExpression}
 +import org.carbondata.scan.expression.conditional.ConditionalExpression
 +import org.carbondata.scan.expression.exception.FilterUnsupportedException
 +import org.carbondata.scan.filter.intf.{ExpressionType, RowIntf}
  import org.carbondata.spark.util.CarbonScalaUtil
  
- class SparkUnknownExpression(sparkExp: SparkExpression)
+ class SparkUnknownExpression(var sparkExp: SparkExpression)
    extends UnknownExpression with ConditionalExpression {
  
+   private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
+   private var isExecutor: Boolean = false
    children.addAll(getColumnList())
  
    override def evaluate(carbonRowInstance: RowIntf): ExpressionResult = {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d6cbaf4,1dd066f..01bb218
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@@ -156,8 -157,13 +156,13 @@@ case class AlterTableModel(dbName: Opti
  case class CompactionModel(compactionSize: Long,
    compactionType: CompactionType,
    carbonTable: CarbonTable,
 -  cubeCreationTime: Long)
 +  tableCreationTime: Long)
  
+ case class CompactionCallableModel(hdfsStoreLocation: String, carbonLoadModel: CarbonLoadModel,
+   partitioner: Partitioner, storeLocation: String, carbonTable: CarbonTable, kettleHomePath: String,
+   cubeCreationTime: Long, loadsToMerge: util.List[LoadMetadataDetails], sqlContext: SQLContext,
+   compactionType: CompactionType)
+ 
  object TableNewProcessor {
    def apply(cm: tableModel, sqlContext: SQLContext): TableInfo = {
      new TableNewProcessor(cm, sqlContext).process
@@@ -767,12 -1201,12 +772,12 @@@ private[sql] case class AlterTableCompa
  
    def run(sqlContext: SQLContext): Seq[Row] = {
      // TODO : Implement it.
-     var tableName = alterTableModel.tableName
+     val tableName = alterTableModel.tableName
 -    val schemaName = getDB.getDatabaseName(alterTableModel.dbName, sqlContext)
 +    val databaseName = getDB.getDatabaseName(alterTableModel.dbName, sqlContext)
      if (null == org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
 -      .getCarbonTable(schemaName + "_" + tableName)) {
 -      logError("alter table failed. table not found: " + schemaName + "." + tableName)
 -      sys.error("alter table failed. table not found: " + schemaName + "." + tableName)
 +      .getCarbonTable(databaseName + "_" + tableName)) {
 +      logError("alter table failed. table not found: " + databaseName + "." + tableName)
 +      sys.error("alter table failed. table not found: " + databaseName + "." + tableName)
      }
  
      val relation =
@@@ -975,20 -1417,23 +980,20 @@@ private[sql] case class DeleteLoadsByLo
        throw new MalformedCarbonCommandException(errorMessage)
      }
  
-     var carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
+     val carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
 -      .getCarbonTable(schemaName + '_' + tableName)
 +      .getCarbonTable(dbName + '_' + tableName)
-     var segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+     val segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
  
      if (null == carbonTable) {
 -      var relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
 -        Option(schemaName),
 -        tableName,
 -        None
 -      )(sqlContext).asInstanceOf[CarbonRelation]
 +      var relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
 +        .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
      }
-     var path = carbonTable.getMetaDataFilepath()
+     val path = carbonTable.getMetaDataFilepath()
  
-     var invalidLoadTimestamps = segmentStatusManager
+     val invalidLoadTimestamps = segmentStatusManager
        .updateDeletionStatus(loadDate, path, timeObj.asInstanceOf[java.lang.Long]).asScala
      if(invalidLoadTimestamps.isEmpty) {
 -      LOGGER.audit(s"Delete load by load date is successfull for $schemaName.$tableName.")
 +      LOGGER.audit(s"Delete load by load date is successfull for $dbName.$tableName.")
      }
      else {
        sys.error("Delete load by load date is failed. No matching load found.")
@@@ -1491,17 -2116,16 +1503,17 @@@ private[sql] case class DeleteLoadByDat
  
    def run(sqlContext: SQLContext): Seq[Row] = {
  
 -    val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
 -    LOGGER.audit(s"The delete load by date request has been received for $schemaName.$cubeName")
 +    val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
 +    LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
 +    val identifier = TableIdentifier(tableName, Option(dbName))
      val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
 -      .lookupRelation1(Some(schemaName), cubeName, None)(sqlContext).asInstanceOf[CarbonRelation]
 +      .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
      var level: String = ""
-     var carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata
+     val carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata
 -         .getInstance().getCarbonTable(schemaName + '_' + cubeName)
 +         .getInstance().getCarbonTable(dbName + '_' + tableName)
      if (relation == null) {
 -      LOGGER.audit(s"The delete load by date is failed. Table $schemaName.$cubeName does not exist")
 -      sys.error(s"Table $schemaName.$cubeName does not exist")
 +      LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist")
 +      sys.error(s"Table $dbName.$tableName does not exist")
      }
  
      val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index 07b505c,e18e1d3..0775fea
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@@ -155,11 -207,19 +155,19 @@@ class CarbonMetastoreCatalog(hiveContex
  
      // creating zookeeper instance once.
      // if zookeeper is configured as carbon lock type.
-     if (CarbonProperties.getInstance()
-       .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.LOCK_TYPE_DEFAULT)
-       .equalsIgnoreCase(CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER)) {
-       val zookeeperUrl = hiveContext.getConf("spark.deploy.zookeeper.url", "127.0.0.1:2181")
 -    val zookeeperUrl: String = hive.getConf(CarbonCommonConstants.ZOOKEEPER_URL, null)
++    val zookeeperUrl: String = hiveContext.getConf(CarbonCommonConstants.ZOOKEEPER_URL, null)
+     if (zookeeperUrl != null) {
+       CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperUrl)
        ZookeeperInit.getInstance(zookeeperUrl)
+       LOGGER.info("Zookeeper url is configured. Taking the zookeeper as lock type.")
+       var configuredLockType = CarbonProperties.getInstance
+       .getProperty(CarbonCommonConstants.LOCK_TYPE)
+       if (null == configuredLockType) {
+         configuredLockType = CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
+         CarbonProperties.getInstance
+             .addProperty(CarbonCommonConstants.LOCK_TYPE,
+                 configuredLockType)
+       }
      }
  
      if (metadataPath == null) {
@@@ -429,25 -489,16 +437,16 @@@
        }
      }
  
 -    metadata.cubesMeta -= metadata.cubesMeta.filter(
 -      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(schemaName) &&
 -           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(cubeName))(0)
 +    metadata.tablesMeta -= metadata.tablesMeta.filter(
 +      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName) &&
 +           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))(0)
      org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
 -      .removeTable(schemaName + "_" + cubeName)
 +      .removeTable(dbName + "_" + tableName)
  
-     try {
-       sqlContext.sql(s"DROP TABLE $dbName.$tableName").collect()
-     } catch {
-       case e: Exception =>
-         LOGGER.audit(
-           s"Error While deleting the table $dbName.$tableName during drop Table" + e.getMessage)
-     }
-     logInfo(s"Table $tableName of $dbName Database dropped syccessfully.")
-     LOGGER.info("Table " + tableName + " of " + dbName + " Database dropped syccessfully.")
- 
 -    sqlContext.asInstanceOf[HiveContext].runSqlHive(s"DROP TABLE IF EXISTS $schemaName.$cubeName")
++    sqlContext.asInstanceOf[HiveContext].runSqlHive(s"DROP TABLE IF EXISTS $dbName.$tableName")
    }
  
 -  private def getTimestampFileAndType(schemaName: String, cubeName: String) = {
 +  private def getTimestampFileAndType(databaseName: String, tableName: String) = {
  
      val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
  

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 85c6a1d,49e6702..748a408
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@@ -405,44 -464,48 +410,48 @@@ object CarbonDataRDDFactory extends Log
        *
        * @param futureList
       */
-     def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]]): Unit = {
-       breakable {
-         while (true) {
- 
-           val loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-             hdfsStoreLocation,
-             carbonLoadModel,
-             partitioner.partitionCount,
-             compactionModel.compactionSize,
-             segList,
-             compactionModel.compactionType
-           )
-           if (loadsToMerge.size() > 1) {
-             loadsToMerge.asScala.foreach(seg => {
-               logger.info("load identified for merge is " + seg.getLoadName)
-             }
-             )
+     def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]], loadsToMerge: util
+     .List[LoadMetadataDetails]): Unit = {
  
-             val future: Future[Void] = executor.submit(new CompactionCallable(hdfsStoreLocation,
-               carbonLoadModel,
-               partitioner,
-               storeLocation,
-               compactionModel.carbonTable,
-               kettleHomePath,
-               compactionModel.tableCreationTime,
-               loadsToMerge,
-               sqlContext
-             )
-             )
-             futureList.add(future)
-             segList = CarbonDataMergerUtil
-               .filterOutAlreadyMergedSegments(segList, loadsToMerge)
-           }
-           else {
-             break
-           }
-         }
+       loadsToMerge.asScala.foreach(seg => {
+         logger.info("loads identified for merge is " + seg.getLoadName)
        }
+       )
+ 
+       val compactionCallableModel = CompactionCallableModel(hdfsStoreLocation,
+         carbonLoadModel,
+         partitioner,
+         storeLocation,
+         compactionModel.carbonTable,
+         kettleHomePath,
 -        compactionModel.cubeCreationTime,
++        compactionModel.tableCreationTime,
+         loadsToMerge,
+         sqlContext,
+         compactionModel.compactionType
+       )
+ 
+       val future: Future[Void] = executor
+         .submit(new CompactionCallable(compactionCallableModel
+         )
+         )
+       futureList.add(future)
+     }
+   }
+ 
+   def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = {
+     // Deleting the any partially loaded data if present.
+     // in some case the segment folder which is present in store will not have entry in
+     // status.
+     // so deleting those folders.
+     try {
+       CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
+     }
+     catch {
+       case e: Exception =>
+         logger
+           .error("Exception in compaction thread while clean up of stale segments " + e
+             .getMessage
+           )
      }
    }
  
@@@ -713,50 -776,59 +722,59 @@@
        CarbonLoaderUtil.checkAndCreateCarbonDataLocation(hdfsStoreLocation,
          carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName,
          partitioner.partitionCount, currentLoadCount.toString)
-       val status = new
-           CarbonDataLoadRDD(sqlContext.sparkContext,
-             new DataLoadResultImpl(),
-             carbonLoadModel,
-             storeLocation,
-             hdfsStoreLocation,
-             kettleHomePath,
-             partitioner,
-             columinar,
-             currentRestructNumber,
-             currentLoadCount,
-             tableCreationTime,
-             schemaLastUpdatedTime,
-             blocksGroupBy,
-             isTableSplitPartition
-           ).collect()
-       val newStatusMap = scala.collection.mutable.Map.empty[String, String]
-       status.foreach { eachLoadStatus =>
-         val state = newStatusMap.get(eachLoadStatus._1)
-         state match {
-           case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
-             newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
-           case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
-             if eachLoadStatus._2.getLoadStatus == CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
-             newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
-           case _ =>
-             newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
-         }
-       }
- 
        var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-       newStatusMap.foreach {
-         case (key, value) =>
-           if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
-             loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-           } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
-             !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
-             loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+       var status: Array[(String, LoadMetadataDetails)] = null
+       try {
+         status = new
 -            CarbonDataLoadRDD(sc.sparkContext,
++            CarbonDataLoadRDD(sqlContext.sparkContext,
+               new DataLoadResultImpl(),
+               carbonLoadModel,
+               storeLocation,
+               hdfsStoreLocation,
+               kettleHomePath,
+               partitioner,
+               columinar,
+               currentRestructNumber,
+               currentLoadCount,
 -              cubeCreationTime,
++              tableCreationTime,
+               schemaLastUpdatedTime,
+               blocksGroupBy,
+               isTableSplitPartition
+             ).collect()
+         val newStatusMap = scala.collection.mutable.Map.empty[String, String]
+         status.foreach { eachLoadStatus =>
+           val state = newStatusMap.get(eachLoadStatus._1)
+           state match {
+             case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
+               newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+             case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+               if eachLoadStatus._2.getLoadStatus ==
+                  CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
+               newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+             case _ =>
+               newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
            }
-       }
+         }
  
-       if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE &&
-         partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) {
-         loadStatus = partitionStatus
+         newStatusMap.foreach {
+           case (key, value) =>
+             if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
+               loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+             } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
+                        !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
+               loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+             }
+         }
+ 
+         if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE &&
+             partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) {
+           loadStatus = partitionStatus
+         }
+       } catch {
+         case ex: Throwable =>
+           loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+           logInfo("DataLoad failure")
+           logger.error(ex)
        }
  
        if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
@@@ -779,10 -851,12 +797,10 @@@
              aggTables.asScala.foreach { aggTableName =>
                CarbonLoaderUtil
                  .deleteSlice(partitioner.partitionCount, carbonLoadModel.getDatabaseName,
 -                  carbonLoadModel.getTableName, aggTableName, hdfsStoreLocation,
 -                  currentRestructNumber, newSlice
 -                )
 +                  carbonLoadModel.getTableName, hdfsStoreLocation, currentRestructNumber, newSlice)
              }
            }
-           message = "Dataload failure"
+           message = "DataLoad failure"
          }
          logInfo("********clean up done**********")
          logger.audit(s"Data load is failed for " +

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 78538f1,70d0cb1..e79937f
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@@ -35,8 -36,10 +36,11 @@@ import org.carbondata.core.carbon.{Carb
  import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
  import org.carbondata.core.constants.CarbonCommonConstants
  import org.carbondata.core.datastorage.store.impl.FileFactory
+ import org.carbondata.core.locks.CarbonLockFactory
+ import org.carbondata.core.locks.LockUsage
+ import org.carbondata.core.util.CarbonProperties
  import org.carbondata.core.util.CarbonTimeStatisticsFactory
 +import org.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
  import org.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
  import org.carbondata.spark.partition.reader.{CSVParser, CSVReader}
  import org.carbondata.spark.tasks.DictionaryWriterTask

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
index e22869c,7b15cbf..f687006
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
@@@ -67,83 -70,119 +69,119 @@@ class CarbonMergerRDD[K, V]
    override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
      val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
      val iter = new Iterator[(K, V)] {
-       var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-       carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-       val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
  
-       val tempLocationKey: String = carbonLoadModel.getDatabaseName + '_' + carbonLoadModel
-         .getTableName + carbonLoadModel.getTaskNo
+       carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+       val tempLocationKey: String = CarbonCommonConstants
+         .COMPACTION_KEY_WORD + '_' + carbonLoadModel
+         .getDatabaseName + '_' + carbonLoadModel
+         .getTableName + '_' + carbonLoadModel.getTaskNo
+ 
+       val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+       if (null != storeLocations && storeLocations.length > 0) {
+         storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+       }
+       if (storeLocation == null) {
+         storeLocation = System.getProperty("java.io.tmpdir")
+       }
+       storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
        CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+       LOGGER.info("Temp storeLocation taken is " + storeLocation)
+       var mergeStatus = false
+       var mergeNumber = ""
+       try {
+         var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+         val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
  
-       // sorting the table block info List.
-       var tableBlockInfoList = carbonSparkPartition.tableBlockInfos
+         // sorting the table block info List.
+         var tableBlockInfoList = carbonSparkPartition.tableBlockInfos
  
-       Collections.sort(tableBlockInfoList)
+         Collections.sort(tableBlockInfoList)
  
-       val segmentMapping: java.util.Map[String, TaskBlockInfo] =
-         CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
+         val segmentMapping: java.util.Map[String, TaskBlockInfo] =
+           CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
  
-       val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
-         CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
+         val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
+           CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
  
-       carbonLoadModel.setStorePath(hdfsStoreLocation)
+         carbonLoadModel.setStorePath(hdfsStoreLocation)
  
-       // taking the last table block info for getting the segment properties.
-       val listMetadata = dataFileMetadataSegMapping.get(tableBlockInfoList.get
-       (tableBlockInfoList.size()-1).getSegmentId())
+         // taking the last table block info for getting the segment properties.
+         val listMetadata = dataFileMetadataSegMapping.get(tableBlockInfoList.get
+         (tableBlockInfoList.size() - 1).getSegmentId()
+         )
  
-       val colCardinality: Array[Int] = listMetadata.get(listMetadata.size() - 1).getSegmentInfo
-         .getColumnCardinality
+         val colCardinality: Array[Int] = listMetadata.get(listMetadata.size() - 1).getSegmentInfo
+           .getColumnCardinality
  
-       val segmentProperties = new SegmentProperties(
-         listMetadata.get(listMetadata.size() - 1).getColumnInTable,
-         colCardinality
-       )
+         val segmentProperties = new SegmentProperties(
+           listMetadata.get(listMetadata.size() - 1).getColumnInTable,
+           colCardinality
+         )
  
-       val exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
-         factTableName, hdfsStoreLocation, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
-         dataFileMetadataSegMapping
-       )
 -        val exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, schemaName,
++        val exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
+           factTableName, hdfsStoreLocation, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+           dataFileMetadataSegMapping
+         )
  
-       // fire a query and get the results.
-       var result2: util.List[RawResultIterator] = null
-       try {
-         result2 = exec.processTableBlocks()
-       } catch {
-         case e: Throwable =>
-           exec.clearDictionaryFromQueryModel
-           LOGGER.error(e)
-           if (null != e.getMessage) {
-             sys.error("Exception occurred in query execution :: " + e.getMessage)
-           } else {
-             sys.error("Exception occurred in query execution.Please check logs.")
-           }
-       }
+         // fire a query and get the results.
+         var result2: util.List[RawResultIterator] = null
+         try {
+           result2 = exec.processTableBlocks()
+         } catch {
+           case e: Throwable =>
+             exec.clearDictionaryFromQueryModel
+             LOGGER.error(e)
+             if (null != e.getMessage) {
+               sys.error("Exception occurred in query execution :: " + e.getMessage)
+             } else {
+               sys.error("Exception occurred in query execution.Please check logs.")
+             }
+         }
  
-       val mergeNumber = mergedLoadName
-         .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
-           CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
+         mergeNumber = mergedLoadName
+           .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
+             CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
+           )
+ 
 -        val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(schemaName,
++        val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
+           factTableName,
+           carbonLoadModel.getTaskNo,
+           "0",
+           mergeNumber,
+           true
          )
  
-       val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
-         factTableName,
-         carbonLoadModel.getTaskNo,
-         "0",
-         mergeNumber
-       )
+         carbonLoadModel.setSegmentId(mergeNumber)
+         carbonLoadModel.setPartitionId("0")
+         val merger =
+           new RowResultMerger(result2,
 -            schemaName,
++            databaseName,
+             factTableName,
+             segmentProperties,
+             tempStoreLoc,
+             carbonLoadModel,
+             colCardinality
+           )
+         mergeStatus = merger.mergerSlice()
  
-       carbonLoadModel.setSegmentId(mergeNumber)
-       carbonLoadModel.setPartitionId("0")
-       val merger =
-         new RowResultMerger(result2,
-         databaseName,
-         factTableName,
-         segmentProperties,
-         tempStoreLoc,
-         carbonLoadModel,
-         colCardinality
-       )
-       val mergeStatus = merger.mergerSlice()
+       }
+       catch {
+         case e: Exception =>
+           LOGGER.error(e)
+           throw e
+       }
+       finally {
+         // delete temp location data
+         val newSlice = CarbonCommonConstants.LOAD_FOLDER + mergeNumber
+         try {
+           val isCompactionFlow = true
+           CarbonLoaderUtil
+             .deleteLocalDataLoadFolderLocation(carbonLoadModel, newSlice, isCompactionFlow)
+         } catch {
+           case e: Exception =>
+             LOGGER.error(e)
+         }
+       }
  
        var finished = false
  

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
index 00bf5b2,0000000..5a1dedc
mode 100644,000000..100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
@@@ -1,253 -1,0 +1,290 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.carbondata.spark.rdd
 +
 +import java.util
 +
 +import scala.collection.JavaConverters._
 +import scala.reflect.ClassTag
 +
 +import org.apache.hadoop.conf.Configuration
 +import org.apache.hadoop.mapreduce.Job
 +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 +import org.apache.spark.rdd.RDD
 +import org.apache.spark.sql.hive.DistributionUtil
 +
 +import org.carbondata.common.CarbonIterator
 +import org.carbondata.common.logging.LogServiceFactory
 +import org.carbondata.core.cache.dictionary.Dictionary
 +import org.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
 +import org.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsRecorder}
 +import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
 +import org.carbondata.scan.executor.QueryExecutorFactory
 +import org.carbondata.scan.expression.Expression
 +import org.carbondata.scan.model.QueryModel
 +import org.carbondata.scan.result.BatchResult
 +import org.carbondata.scan.result.iterator.ChunkRowIterator
 +import org.carbondata.spark.RawValue
 +import org.carbondata.spark.load.CarbonLoaderUtil
 +import org.carbondata.spark.util.QueryPlanUtil
 +
 +class CarbonSparkPartition(rddId: Int, val idx: Int,
 +  val locations: Array[String],
 +  val tableBlockInfos: util.List[TableBlockInfo])
 +  extends Partition {
 +
 +  override val index: Int = idx
 +
 +  // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations)
 +  override def hashCode(): Int = {
 +    41 * (41 + rddId) + idx
 +  }
 +}
 +
 + /**
 +  * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
 +  * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
 +  * level filtering in driver side.
 +  */
 +class CarbonScanRDD[V: ClassTag](
 +  sc: SparkContext,
 +  queryModel: QueryModel,
 +  filterExpression: Expression,
 +  keyClass: RawValue[V],
 +  @transient conf: Configuration,
 +  tableCreationTime: Long,
 +  schemaLastUpdatedTime: Long,
 +  baseStoreLocation: String)
 +  extends RDD[V](sc, Nil) with Logging {
 +
 +  val defaultParallelism = sc.defaultParallelism
 +
 +  override def getPartitions: Array[Partition] = {
 +    val statisticRecorder = new QueryStatisticsRecorder(queryModel.getQueryId)
 +    val startTime = System.currentTimeMillis()
 +    val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
 +      QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
 +
 +    val result = new util.ArrayList[Partition](defaultParallelism)
 +    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 +    // set filter resolver tree
 +    try {
 +      // before applying filter check whether segments are available in the table.
 +      val splits = carbonInputFormat.getSplits(job)
 +      if (!splits.isEmpty) {
-         var filterResolver = carbonInputFormat
++        val filterResolver = carbonInputFormat
 +          .getResolvedFilter(job.getConfiguration, filterExpression)
 +        CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
 +        queryModel.setFilterExpressionResolverTree(filterResolver)
 +      }
 +    }
 +    catch {
 +      case e: Exception =>
 +        LOGGER.error(e)
 +        sys.error("Exception occurred in query execution :: " + e.getMessage)
 +    }
 +    // get splits
 +    val splits = carbonInputFormat.getSplits(job)
 +    if (!splits.isEmpty) {
 +      val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
 +
 +      val blockList = carbonInputSplits.map(inputSplit =>
 +        new TableBlockInfo(inputSplit.getPath.toString,
 +          inputSplit.getStart, inputSplit.getSegmentId,
 +          inputSplit.getLocations, inputSplit.getLength
 +        ).asInstanceOf[Distributable]
 +      )
 +      if (blockList.nonEmpty) {
 +        // group blocks to nodes, tasks
 +        val startTime = System.currentTimeMillis
 +        var statistic = new QueryStatistic
 +        val activeNodes = DistributionUtil
 +          .ensureExecutorsAndGetNodeList(blockList.toArray, sparkContext)
 +        val nodeBlockMapping =
 +          CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
 +            activeNodes.toList.asJava
 +          )
 +        val timeElapsed: Long = System.currentTimeMillis - startTime
 +        statistic.addStatistics("Total Time taken in block(s) allocation", System.currentTimeMillis)
 +        statisticRecorder.recordStatistics(statistic);
 +        statistic = new QueryStatistic
 +        var i = 0
 +        // Create Spark Partition for each task and assign blocks
 +        nodeBlockMapping.asScala.foreach { entry =>
 +          entry._2.asScala.foreach { blocksPerTask => {
 +            val tableBlockInfo = blocksPerTask.asScala.map(_.asInstanceOf[TableBlockInfo])
 +            if (blocksPerTask.size() != 0) {
 +              result
 +                .add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, tableBlockInfo.asJava))
 +              i += 1
 +            }
 +          }
 +          }
 +        }
 +        val noOfBlocks = blockList.size
 +        val noOfNodes = nodeBlockMapping.size
 +        val noOfTasks = result.size()
 +        logInfo(s"Identified  no.of.Blocks: $noOfBlocks,"
 +                + s"parallelism: $defaultParallelism , " +
 +                s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
 +        )
 +        statistic.addStatistics("Time taken to identify Block(s) to scan", System.currentTimeMillis)
 +        statisticRecorder.recordStatistics(statistic);
 +        statisticRecorder.logStatistics
 +        result.asScala.foreach { r =>
 +          val cp = r.asInstanceOf[CarbonSparkPartition]
 +          logInfo(s"Node : " + cp.locations.toSeq.mkString(",")
 +                  + ", No.Of Blocks : " + cp.tableBlockInfos.size()
 +          )
 +        }
 +      } else {
 +        logInfo("No blocks identified to scan")
 +        val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
 +        result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
 +      }
 +    }
 +    else {
 +      logInfo("No valid segments found to scan")
 +      val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
 +      result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
 +    }
 +    result.toArray(new Array[Partition](result.size()))
 +  }
 +
 +   override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
 +     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 +     val iter = new Iterator[V] {
 +       var rowIterator: CarbonIterator[Array[Any]] = _
 +       var queryStartTime: Long = 0
 +       try {
 +         val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
 +         if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
 +           queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
 +           // fill table block info
 +           queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
 +           queryStartTime = System.currentTimeMillis
 +
 +           val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
 +           logInfo("*************************" + carbonPropertiesFilePath)
 +           if (null == carbonPropertiesFilePath) {
 +             System.setProperty("carbon.properties.filepath",
 +               System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
 +           }
 +           // execute query
 +           rowIterator = new ChunkRowIterator(
 +             QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel).
 +               asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
 +
 +         }
 +       } catch {
 +         case e: Exception =>
 +           LOGGER.error(e)
 +           if (null != e.getMessage) {
 +             sys.error("Exception occurred in query execution :: " + e.getMessage)
 +           } else {
 +             sys.error("Exception occurred in query execution.Please check logs.")
 +           }
 +       }
 +
 +       var havePair = false
 +       var finished = false
 +       var recordCount = 0
 +
 +       override def hasNext: Boolean = {
 +         if (!finished && !havePair) {
 +           finished = (null == rowIterator) || (!rowIterator.hasNext)
 +           havePair = !finished
 +         }
 +         if (finished) {
 +           clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
-            if(null!=queryModel.getStatisticsRecorder) {
++           if (null != queryModel.getStatisticsRecorder) {
++             val queryStatistic = new QueryStatistic
++             queryStatistic
++               .addStatistics("Total Time taken to execute the query in executor Side",
++                 System.currentTimeMillis - queryStartTime
++               )
++             queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
 +             queryModel.getStatisticsRecorder.logStatistics();
 +           }
 +         }
 +         !finished
 +       }
 +
 +       override def next(): V = {
 +         if (!hasNext) {
 +           throw new java.util.NoSuchElementException("End of stream")
 +         }
 +         havePair = false
 +         recordCount += 1
 +         if (queryModel.getLimit != -1 && recordCount >= queryModel.getLimit) {
 +           clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
-            if(null!=queryModel.getStatisticsRecorder) {
++           if (null != queryModel.getStatisticsRecorder) {
++             val queryStatistic = new QueryStatistic
++             queryStatistic
++               .addStatistics("Total Time taken to execute the query in executor Side",
++                 System.currentTimeMillis - queryStartTime
++               )
++             queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
 +             queryModel.getStatisticsRecorder.logStatistics();
 +           }
 +         }
 +         keyClass.getValue(rowIterator.next())
 +       }
 +       def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = {
 +         if (null != columnToDictionaryMap) {
 +           org.carbondata.spark.util.CarbonQueryUtil
 +             .clearColumnDictionaryCache(columnToDictionaryMap)
 +         }
 +       }
 +     }
 +     iter
 +   }
 +
 +   /**
 +    * Get the preferred locations where to launch this task.
 +    */
-   override def getPreferredLocations(partition: Partition): Seq[String] = {
-     val theSplit = partition.asInstanceOf[CarbonSparkPartition]
-     theSplit.locations.filter(_ != "localhost")
-   }
++   override def getPreferredLocations(partition: Partition): Seq[String] = {
++     val theSplit = partition.asInstanceOf[CarbonSparkPartition]
++     val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
++     val tableBlocks = theSplit.tableBlockInfos
++     // node name and count mapping
++     val blockMap = new util.LinkedHashMap[String, Integer]()
++
++     tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
++       location => {
++         if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
++           val currentCount = blockMap.get(location)
++           if (currentCount == null) {
++             blockMap.put(location, 1)
++           } else {
++             blockMap.put(location, currentCount + 1)
++           }
++         }
++       }
++     )
++     )
++
++     val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
++       nodeCount1.getValue > nodeCount2.getValue
++     }
++     )
++
++     val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
++     firstOptionLocation ++ sortedNodesList
++   }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
index 1693008,b3effd3..5406e77
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
@@@ -79,22 -93,40 +79,35 @@@ object CarbonScalaUtil 
      }
    }
  
 -  def convertCarbonToSparkDataType(dataType: DataType): types.DataType = {
 -    dataType match {
 -      case DataType.STRING => StringType
 -      case DataType.INT => IntegerType
 -      case DataType.LONG => LongType
 -      case DataType.DOUBLE => DoubleType
 -      case DataType.BOOLEAN => BooleanType
 -      case DataType.DECIMAL => DecimalType.SYSTEM_DEFAULT
 -      case DataType.TIMESTAMP => TimestampType
 +  def getKettleHomePath(sqlContext: SQLContext): String = {
 +    val carbonHome = System.getenv("CARBON_HOME")
 +    var kettleHomePath: String = null
 +    if (carbonHome != null) {
 +      kettleHomePath = System.getenv("CARBON_HOME") + "/processing/carbonplugins"
 +    }
 +    if (kettleHomePath == null) {
 +      kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
      }
 +    if (null == kettleHomePath) {
 +      kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
 +    }
 +    kettleHomePath
    }
  
+   def updateDataType(
+       currentDataType: org.apache.spark.sql.types.DataType): org.apache.spark.sql.types.DataType = {
+     currentDataType match {
+       case decimal: DecimalType =>
+         val scale = currentDataType.asInstanceOf[DecimalType].scale
+         DecimalType(DecimalType.MAX_PRECISION, scale)
+       case _ =>
+         currentDataType
+     }
+   }
+ 
+   case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
+ 
    object CarbonSparkUtil {
 -    def createBaseRDD(carbonContext: CarbonContext, carbonTable: CarbonTable): TransformHolder = {
 -      val relation = CarbonEnv.getInstance(carbonContext).carbonCatalog
 -        .lookupRelation1(Option(carbonTable.getDatabaseName),
 -          carbonTable.getFactTableName, None)(carbonContext).asInstanceOf[CarbonRelation]
 -      val rdd = new SchemaRDD(carbonContext, relation)
 -      rdd.registerTempTable(carbonTable.getFactTableName)
 -      TransformHolder(rdd, createSparkMeta(carbonTable))
 -    }
  
      def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
        val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
index b381079,0e333dd..780c022
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
@@@ -80,11 -92,8 +92,9 @@@ class TestLoadDataWithDictionaryExclude
    }
  
    test("test load data with dictionary exclude & include and with empty dimension") {
 +    sql("select ID from t3").show()
      checkAnswer(
-       sql("select ID from t3"), Seq(Row(1), Row(2), Row(3), Row(4), Row(5), Row(6), Row(7),
-         Row(8), Row(9), Row(10), Row(11), Row(12), Row(13), Row(14), Row(15), Row(16), Row
-         (17), Row(18), Row(19), Row(20))
+       sql("select ID from exclude_include_t3"), sql("select ID from exclude_include_hive_t3")
      )
    }
  

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
index 483a766,db67c86..a29cefb
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
@@@ -60,45 -61,82 +60,82 @@@ class TimestampDataTypeDirectDictionary
          .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss")
        val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
          .getCanonicalPath
-       var csvFilePath = currentDirectory + "/src/test/resources/datasample.csv"
+       val csvFilePath = currentDirectory + "/src/test/resources/datasample.csv"
 -      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE directDictionaryCube OPTIONS" +
 +      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE directDictionaryTable OPTIONS" +
          "('DELIMITER'= ',', 'QUOTECHAR'= '\"')");
- 
      } catch {
        case x: Throwable => CarbonProperties.getInstance()
          .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
      }
    }
  
-   test("select doj from directDictionaryTable") {
+   test("test direct dictionary for not null condition") {
      checkAnswer(
-       sql("select doj from directDictionaryTable"),
 -      sql("select doj from directDictionaryCube where doj is not null"),
++      sql("select doj from directDictionaryTable where doj is not null"),
        Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09.0")),
          Row(Timestamp.valueOf("2016-04-14 15:00:09.0"))
        )
      )
    }
  
+   test("test direct dictionary for getting all the values") {
+     checkAnswer(
+       sql("select doj from directDictionaryCube"),
+       Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09.0")),
+         Row(Timestamp.valueOf("2016-04-14 15:00:09.0")),
+         Row(null)
+       )
+     )
+   }
+ 
+   test("test direct dictionary for not equals condition") {
+     checkAnswer(
+       sql("select doj from directDictionaryCube where doj != '2016-04-14 15:00:09.0'"),
+       Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09.0"))
+       )
+     )
+   }
+ 
+   test("test direct dictionary for null condition") {
+     checkAnswer(
+       sql("select doj from directDictionaryCube where doj is null"),
+       Seq(Row(null)
+       )
+     )
+   }
  
 -  test("select doj from directDictionaryCube with equals filter") {
 +  test("select doj from directDictionaryTable with equals filter") {
      checkAnswer(
 -      sql("select doj from directDictionaryCube where doj='2016-03-14 15:00:09'"),
 +      sql("select doj from directDictionaryTable where doj='2016-03-14 15:00:09'"),
        Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09")))
      )
  
    }
    
-     test("select doj from directDictionaryTable with greater than filter") {
 -  test("select doj from directDictionaryCube with regexp_replace equals filter") {
++  test("select doj from directDictionaryTable with regexp_replace equals filter") {
+     checkAnswer(
 -      sql("select doj from directDictionaryCube where regexp_replace(doj, '-', '/') = '2016/03/14 15:00:09'"),
++      sql("select doj from directDictionaryTable where regexp_replace(doj, '-', '/') = '2016/03/14 15:00:09'"),
+       Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09")))
+     )
+   }
 -  
 -  test("select doj from directDictionaryCube with regexp_replace NOT IN filter") {
++
++  test("select doj from directDictionaryTable with regexp_replace NOT IN filter") {
+     checkAnswer(
 -      sql("select doj from directDictionaryCube where regexp_replace(doj, '-', '/') NOT IN ('2016/03/14 15:00:09')"),
++      sql("select doj from directDictionaryTable where regexp_replace(doj, '-', '/') NOT IN ('2016/03/14 15:00:09')"),
+       Seq(Row(Timestamp.valueOf("2016-04-14 15:00:09")), Row(null))
+     )
+   }
 -  
 -  test("select doj from directDictionaryCube with greater than filter") {
++
++  test("select doj from directDictionaryTable with greater than filter") {
      checkAnswer(
 -      sql("select doj from directDictionaryCube where doj>'2016-03-14 15:00:09'"),
 +      sql("select doj from directDictionaryTable where doj>'2016-03-14 15:00:09'"),
        Seq(Row(Timestamp.valueOf("2016-04-14 15:00:09")))
      )
- 
    }
  
 -  test("select count(doj) from directDictionaryCube") {
 +  test("select count(doj) from directDictionaryTable") {
      checkAnswer(
 -      sql("select count(doj) from directDictionaryCube"),
 +      sql("select count(doj) from directDictionaryTable"),
        Seq(Row(2))
      )
    }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryWithNoDictTestCase.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeNullDataTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
index 161a19e,7cb6dfd..4075e60
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
@@@ -41,14 -41,26 +41,26 @@@ class AllDataTypesTestCaseFilter extend
  
    }
  
 -  test("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeFilter where empname in ('arvind','ayushi') group by empno,empname,utilization") {
 +  test("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestableFilter where empname in ('arvind','ayushi') group by empno,empname,utilization") {
      checkAnswer(
 -      sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeFilter where empname in ('arvind','ayushi') group by empno,empname,utilization"),
 -      sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeFilter_hive where empname in ('arvind','ayushi') group by empno,empname,utilization"))
 +      sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestableFilter where empname in ('arvind','ayushi') group by empno,empname,utilization"),
 +      sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestableFilter_hive where empname in ('arvind','ayushi') group by empno,empname,utilization"))
    }
- 
+   
+   test("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') NOT IN ('development')") {
+     checkAnswer(
+       sql("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') NOT IN ('development')"),
+       sql("select empno,empname from alldatatypescubeFilter_hive where regexp_replace(workgroupcategoryname, 'er', 'ment') NOT IN ('development')"))
+   }
+   
+   test("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'") {
+     checkAnswer(
+       sql("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'"),
+       sql("select empno,empname from alldatatypescubeFilter_hive where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'"))
+   }
+   
    override def afterAll {
 -    sql("drop table alldatatypescubeFilter")
 -    sql("drop table alldatatypescubeFilter_hive")
 +    sql("drop table alldatatypestableFilter")
 +    sql("drop table alldatatypestableFilter_hive")
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/lcm/locks/CarbonLockFactory.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/lcm/locks/CarbonLockFactory.java
index 10aa1fd,0000000..0fae687
mode 100644,000000..100644
--- a/processing/src/main/java/org/carbondata/lcm/locks/CarbonLockFactory.java
+++ b/processing/src/main/java/org/carbondata/lcm/locks/CarbonLockFactory.java
@@@ -1,73 -1,0 +1,94 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.lcm.locks;
 +
 +import org.carbondata.core.carbon.CarbonTableIdentifier;
 +import org.carbondata.core.constants.CarbonCommonConstants;
 +import org.carbondata.core.util.CarbonProperties;
 +
 +/**
 + * This class is a Lock factory class which is used to provide lock objects.
 + * Using this lock object client can request the lock and unlock.
 + */
 +public class CarbonLockFactory {
 +
 +  /**
 +   * lockTypeConfigured to check if zookeeper feature is enabled or not for carbon.
 +   */
 +  private static String lockTypeConfigured;
 +
 +  static {
-     CarbonLockFactory.updateZooKeeperLockingStatus();
++    CarbonLockFactory.getLockTypeConfigured();
 +  }
 +
 +  /**
 +   * This method will determine the lock type.
 +   *
 +   * @param tableIdentifier
 +   * @param lockFile
 +   * @return
 +   */
 +  public static ICarbonLock getCarbonLockObj(CarbonTableIdentifier tableIdentifier,
 +      String lockFile) {
 +    switch (lockTypeConfigured.toUpperCase()) {
 +      case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
 +        return new LocalFileLock(tableIdentifier, lockFile);
 +
 +      case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
 +        return new ZooKeeperLocking(tableIdentifier, lockFile);
 +
 +      case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
 +        return new HdfsFileLock(tableIdentifier, lockFile);
 +
 +      default:
 +        throw new UnsupportedOperationException("Not supported the lock type");
 +    }
 +  }
 +
 +  /**
++   *
++   * @param locFileLocation
++   * @param lockFile
++   * @return carbon lock
++   */
++  public static ICarbonLock getCarbonLockObj(String locFileLocation, String lockFile) {
++    switch (lockTypeConfigured.toUpperCase()) {
++      case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
++        return new LocalFileLock(locFileLocation, lockFile);
++
++      case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
++        return new ZooKeeperLocking(locFileLocation, lockFile);
++
++      case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
++        return new HdfsFileLock(locFileLocation, lockFile);
++
++      default:
++        throw new UnsupportedOperationException("Not supported the lock type");
++    }
++  }
++
++  /**
 +   * This method will set the zookeeper status whether zookeeper to be used for locking or not.
 +   */
-   private static void updateZooKeeperLockingStatus() {
++  private static void getLockTypeConfigured() {
 +    lockTypeConfigured = CarbonProperties.getInstance()
 +        .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.LOCK_TYPE_DEFAULT);
- 
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/lcm/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/lcm/locks/HdfsFileLock.java
index b45ae96,0000000..3305477
mode 100644,000000..100644
--- a/processing/src/main/java/org/carbondata/lcm/locks/HdfsFileLock.java
+++ b/processing/src/main/java/org/carbondata/lcm/locks/HdfsFileLock.java
@@@ -1,91 -1,0 +1,106 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.lcm.locks;
 +
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +
++import org.carbondata.common.logging.LogService;
++import org.carbondata.common.logging.LogServiceFactory;
 +import org.carbondata.core.carbon.CarbonTableIdentifier;
 +import org.carbondata.core.constants.CarbonCommonConstants;
 +import org.carbondata.core.datastorage.store.impl.FileFactory;
++import org.carbondata.core.util.CarbonProperties;
 +
 +/**
 + * This class is used to handle the HDFS File locking.
 + * This is acheived using the concept of acquiring the data out stream using Append option.
 + */
 +public class HdfsFileLock extends AbstractCarbonLock {
 +
++  private static final LogService LOGGER =
++             LogServiceFactory.getLogService(HdfsFileLock.class.getName());
 +  /**
 +   * location hdfs file location
 +   */
 +  private String location;
 +
 +  private DataOutputStream dataOutputStream;
 +
 +  public static String tmpPath;
 +
 +  static {
-     tmpPath = System.getProperty("hadoop.tmp.dir");
++    tmpPath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
++               System.getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION));
++  }
++
++  /**
++   * @param lockFileLocation
++   * @param lockFile
++   */
++  public HdfsFileLock(String lockFileLocation, String lockFile) {
++    this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation
++        + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
++    LOGGER.info("HDFS lock path:"+this.location);
++    initRetry();
 +  }
 +
 +  /**
 +   * @param tableIdentifier
 +   * @param lockFile
 +   */
 +  public HdfsFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) {
-     this.location =
-         tmpPath + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName()
-             + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getTableName()
-             + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
++    this(tableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier
++        .getTableName(), lockFile);
 +    initRetry();
 +  }
 +
 +  /* (non-Javadoc)
 +   * @see org.carbondata.core.locks.ICarbonLock#lock()
 +   */
 +  @Override public boolean lock() {
 +    try {
 +      if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) {
 +        FileFactory.createNewLockFile(location, FileFactory.getFileType(location));
 +      }
 +      dataOutputStream =
 +          FileFactory.getDataOutputStreamUsingAppend(location, FileFactory.getFileType(location));
 +
 +      return true;
 +
 +    } catch (IOException e) {
 +      return false;
 +    }
 +  }
 +
 +  /* (non-Javadoc)
 +   * @see org.carbondata.core.locks.ICarbonLock#unlock()
 +   */
 +  @Override public boolean unlock() {
 +    if (null != dataOutputStream) {
 +      try {
 +        dataOutputStream.close();
 +      } catch (IOException e) {
 +        return false;
 +      }
 +    }
 +    return true;
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
index 88f9a23,0000000..15374a3
mode 100644,000000..100644
--- a/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
+++ b/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
@@@ -1,151 -1,0 +1,159 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.lcm.locks;
 +
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.nio.channels.FileChannel;
 +import java.nio.channels.FileLock;
 +import java.nio.channels.OverlappingFileLockException;
 +
 +import org.carbondata.common.logging.LogService;
 +import org.carbondata.common.logging.LogServiceFactory;
 +import org.carbondata.core.carbon.CarbonTableIdentifier;
 +import org.carbondata.core.constants.CarbonCommonConstants;
 +import org.carbondata.core.datastorage.store.impl.FileFactory;
 +
 +/**
 + * This class handles the file locking in the local file system.
 + * This will be handled using the file channel lock API.
 + */
 +public class LocalFileLock extends AbstractCarbonLock {
 +  /**
 +   * location is the location of the lock file.
 +   */
 +  private String location;
 +
 +  /**
 +   * fileOutputStream of the local lock file
 +   */
 +  private FileOutputStream fileOutputStream;
 +
 +  /**
 +   * channel is the FileChannel of the lock file.
 +   */
 +  private FileChannel channel;
 +
 +  /**
 +   * fileLock NIO FileLock Object
 +   */
 +  private FileLock fileLock;
 +
 +  /**
 +   * lock file
 +   */
 +  private String lockFile;
 +
 +  public static final String tmpPath;
 +
 +  private String tableName;
 +
 +  private String databaseName;
 +
 +  /**
 +   * LOGGER for  logging the messages.
 +   */
 +  private static final LogService LOGGER =
 +      LogServiceFactory.getLogService(LocalFileLock.class.getName());
 +
 +  static {
 +    tmpPath = System.getProperty("java.io.tmpdir");
 +  }
 +
 +  /**
++   * @param lockFileLocation
++   * @param lockFile
++   */
++  public LocalFileLock(String lockFileLocation, String lockFile) {
++    this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation;
++    this.lockFile = lockFile;
++    initRetry();
++  }
++
++  /**
 +   * @param tableIdentifier
 +   * @param lockFile
 +   */
 +  public LocalFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) {
-     this.location =
-         tmpPath + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName()
-             + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getTableName();
-     this.lockFile = lockFile;
++    this(tableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier
++        .getTableName(), lockFile);
 +    initRetry();
 +  }
 +
 +  /**
 +   * Lock API for locking of the file channel of the lock file.
 +   *
 +   * @return
 +   */
 +  @Override public boolean lock() {
 +    try {
 +      if (!FileFactory.isFileExist(location, FileFactory.getFileType(tmpPath))) {
 +        FileFactory.mkdirs(location, FileFactory.getFileType(tmpPath));
 +      }
 +      String lockFilePath = location + CarbonCommonConstants.FILE_SEPARATOR +
 +          lockFile;
 +      if (!FileFactory.isFileExist(lockFilePath, FileFactory.getFileType(location))) {
 +        FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(location));
 +      }
 +
 +      fileOutputStream = new FileOutputStream(lockFilePath);
 +      channel = fileOutputStream.getChannel();
 +      try {
 +        fileLock = channel.tryLock();
 +      } catch (OverlappingFileLockException e) {
 +        return false;
 +      }
 +      if (null != fileLock) {
 +        return true;
 +      } else {
 +        return false;
 +      }
 +    } catch (IOException e) {
 +      return false;
 +    }
 +
 +  }
 +
 +  /**
 +   * Unlock API for unlocking of the acquired lock.
 +   *
 +   * @return
 +   */
 +  @Override public boolean unlock() {
 +    boolean status;
 +    try {
 +      if (null != fileLock) {
 +        fileLock.release();
 +      }
 +      status = true;
 +    } catch (IOException e) {
 +      status = false;
 +    } finally {
 +      if (null != fileOutputStream) {
 +        try {
 +          fileOutputStream.close();
 +        } catch (IOException e) {
 +          LOGGER.error(e.getMessage());
 +        }
 +      }
 +    }
 +    return status;
 +  }
 +
 +}