You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:48 UTC

[10/14] incubator-carbondata git commit: rebase

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
new file mode 100644
index 0000000..1d8d6b2
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
+import org.apache.spark.sql.Row
+import org.apache.spark.util.SparkUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
+import org.apache.carbondata.processing.constants.DataProcessorConstants
+import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator
+import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
+import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This partition class use to split by TableSplit
+ *
+ */
+class CarbonTableSplitPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit,
+    val blocksDetails: Array[BlockDetails])
+  extends Partition {
+
+  override val index: Int = idx
+  val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit)
+  val partitionBlocksDetail = blocksDetails
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+/**
+ * This partition class use to split by Host
+ *
+ */
+class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
+    val blocksDetails: Array[BlockDetails])
+  extends Partition {
+
+  override val index: Int = idx
+  val serializableHadoopSplit = host
+  val nodeBlocksDetail = blocksDetails
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+class SparkPartitionLoader(model: CarbonLoadModel,
+    splitIndex: Int,
+    storePath: String,
+    kettleHomePath: String,
+    loadCount: Int,
+    loadMetadataDetails: LoadMetadataDetails) {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  var storeLocation: String = ""
+
+  def initialize(): Unit = {
+    val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
+    if (null == carbonPropertiesFilePath) {
+      System.setProperty("carbon.properties.filepath",
+        System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
+    }
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
+    CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
+    CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
+    CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true")
+    CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true")
+    CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true")
+    CarbonProperties.getInstance().addProperty("high.cardinality.value", "100000")
+    CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false")
+    CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000")
+
+    // this property is used to determine whether temp location for carbon is inside
+    // container temp dir or is yarn application directory.
+    val carbonUseLocalDir = CarbonProperties.getInstance()
+      .getProperty("carbon.use.local.dir", "false")
+    if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+      val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+      if (null != storeLocations && storeLocations.nonEmpty) {
+        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+      }
+      if (storeLocation == null) {
+        storeLocation = System.getProperty("java.io.tmpdir")
+      }
+    } else {
+      storeLocation = System.getProperty("java.io.tmpdir")
+    }
+    storeLocation = storeLocation + '/' + System.nanoTime() + '/' + splitIndex
+  }
+
+  def run(): Unit = {
+    try {
+      CarbonLoaderUtil.executeGraph(model, storeLocation, storePath,
+        kettleHomePath)
+      loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+    } catch {
+      case e: DataLoadingException => if (e.getErrorCode ==
+                                          DataProcessorConstants.BAD_REC_FOUND) {
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+        LOGGER.info("Bad Record Found")
+      } else {
+        throw e
+      }
+      case e: Exception =>
+        throw e
+    } finally {
+      // delete temp location data
+      try {
+        val isCompaction = false
+        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, isCompaction)
+      } catch {
+        case e: Exception =>
+          LOGGER.error(e, "Failed to delete local data")
+      }
+      if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
+        loadMetadataDetails.getLoadStatus)) {
+        if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+          .equals(loadMetadataDetails.getLoadStatus)) {
+          LOGGER.info("DataLoad complete")
+          LOGGER.info("Data Load partially successful with LoadCount:" + loadCount)
+        } else {
+          LOGGER.info("DataLoad complete")
+          LOGGER.info("Data Loaded successfully with LoadCount:" + loadCount)
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.printStatisticsInfo(
+            model.getPartitionId)
+        }
+      }
+    }
+  }
+}
+
+/**
+ * Use this RDD class to load csv data file
+ *
+ * @param sc                    The SparkContext to associate the RDD with.
+ * @param result                Output result
+ * @param carbonLoadModel       Carbon load model which contain the load info
+ * @param storePath             The store location
+ * @param kettleHomePath        The kettle home path
+ * @param columinar             whether it is columinar
+ * @param loadCount             Current load count
+ * @param tableCreationTime     Time of creating table
+ * @param schemaLastUpdatedTime Time of last schema update
+ * @param blocksGroupBy         Blocks Array which is group by partition or host
+ * @param isTableSplitPartition Whether using table split partition
+ * @tparam K Class of the key associated with the Result.
+ * @tparam V Class of the value associated with the Result.
+ */
+class DataFileLoaderRDD[K, V](
+    sc: SparkContext,
+    result: DataLoadResult[K, V],
+    carbonLoadModel: CarbonLoadModel,
+    storePath: String,
+    kettleHomePath: String,
+    columinar: Boolean,
+    loadCount: Integer,
+    tableCreationTime: Long,
+    schemaLastUpdatedTime: Long,
+    blocksGroupBy: Array[(String, Array[BlockDetails])],
+    isTableSplitPartition: Boolean) extends RDD[(K, V)](sc, Nil) {
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  override def getPartitions: Array[Partition] = {
+    if (isTableSplitPartition) {
+      // for table split partition
+      var splits = Array[TableSplit]()
+      if (carbonLoadModel.isDirectLoad) {
+        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
+      } else {
+        splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+          carbonLoadModel.getTableName, null)
+      }
+
+      splits.zipWithIndex.map { case (split, index) =>
+        // filter the same partition unique id, because only one will match, so get 0 element
+        val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter { case (uniqueId, _) =>
+          uniqueId == split.getPartition.getUniqueID
+        }(0)._2
+        new CarbonTableSplitPartition(id, index, split, blocksDetails)
+      }
+    } else {
+      // for node partition
+      blocksGroupBy.zipWithIndex.map { case ((uniqueId, blockDetails), index) =>
+        new CarbonNodePartition(id, index, uniqueId, blockDetails)
+      }
+    }
+  }
+
+  override def checkpoint() {
+    // Do nothing. Hadoop RDD should not be checkpointed.
+  }
+
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val iter = new Iterator[(K, V)] {
+      var partitionID = "0"
+      val loadMetadataDetails = new LoadMetadataDetails()
+      var model: CarbonLoadModel = _
+      var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
+                               theSplit.index
+      try {
+        loadMetadataDetails.setPartitionCount(partitionID)
+        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+        setModelAndBlocksInfo()
+        val loader = new SparkPartitionLoader(model, theSplit.index, storePath,
+          kettleHomePath, loadCount, loadMetadataDetails)
+        loader.initialize
+        if (model.isRetentionRequest) {
+          recreateAggregationTableForRetention
+        } else if (model.isAggLoadRequest) {
+          loadMetadataDetails.setLoadStatus(createManualAggregateTable)
+        } else {
+          loader.run()
+        }
+      } catch {
+        case e: Exception =>
+          logInfo("DataLoad failure")
+          LOGGER.error(e)
+          throw e
+      }
+
+      def setModelAndBlocksInfo(): Unit = {
+        if (isTableSplitPartition) {
+          // for table split partition
+          val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
+          logInfo("Input split: " + split.serializableHadoopSplit.value)
+          val blocksID = gernerateBlocksID
+          carbonLoadModel.setBlocksID(blocksID)
+          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+          if (carbonLoadModel.isDirectLoad) {
+            model = carbonLoadModel.getCopyWithPartition(
+              split.serializableHadoopSplit.value.getPartition.getUniqueID,
+              split.serializableHadoopSplit.value.getPartition.getFilesPath,
+              carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+          } else {
+            model = carbonLoadModel.getCopyWithPartition(
+              split.serializableHadoopSplit.value.getPartition.getUniqueID)
+          }
+          partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
+          // get this partition data blocks and put it to global static map
+          GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail)
+          StandardLogService.setThreadName(partitionID, null)
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
+            partitionID, split.partitionBlocksDetail.length)
+        } else {
+          // for node partition
+          val split = theSplit.asInstanceOf[CarbonNodePartition]
+          logInfo("Input split: " + split.serializableHadoopSplit)
+          logInfo("The Block Count in this node: " + split.nodeBlocksDetail.length)
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
+            split.serializableHadoopSplit, split.nodeBlocksDetail.length)
+          val blocksID = gernerateBlocksID
+          carbonLoadModel.setBlocksID(blocksID)
+          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+          // set this node blocks info to global static map
+          GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail)
+          if (carbonLoadModel.isDirectLoad) {
+            val filelist: java.util.List[String] = new java.util.ArrayList[String](
+              CarbonCommonConstants.CONSTANT_SIZE_TEN)
+            CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
+            model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
+              carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+          } else {
+            model = carbonLoadModel.getCopyWithPartition(partitionID)
+          }
+          StandardLogService.setThreadName(blocksID, null)
+        }
+      }
+
+      /**
+       * generate blocks id
+       *
+       * @return
+       */
+      def gernerateBlocksID: String = {
+        if (isTableSplitPartition) {
+          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+          theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
+            .getPartition.getUniqueID + "_" + UUID.randomUUID()
+        } else {
+          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+          UUID.randomUUID()
+        }
+      }
+
+      def checkAndLoadAggregationTable: String = {
+        val schema = model.getCarbonDataLoadSchema
+        val aggTables = schema.getCarbonTable.getAggregateTablesName
+        if (null != aggTables && !aggTables.isEmpty) {
+          val details = model.getLoadMetadataDetails.asScala.toArray
+          val newSlice = CarbonCommonConstants.LOAD_FOLDER + loadCount
+          var listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details)
+          listOfLoadFolders = CarbonLoaderUtil.addNewSliceNameToList(newSlice, listOfLoadFolders)
+          val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details)
+          var listOfAllLoadFolders = CarbonQueryUtil.getListOfSlices(details)
+          listOfAllLoadFolders = CarbonLoaderUtil
+            .addNewSliceNameToList(newSlice, listOfAllLoadFolders)
+          val copyListOfLoadFolders = listOfLoadFolders.asScala.toList
+          val copyListOfUpdatedLoadFolders = listOfUpdatedLoadFolders.asScala.toList
+          loadTableSlices(listOfAllLoadFolders, details)
+          val loadFolders = Array[String]()
+          loadMetadataDetails.setLoadStatus(iterateOverAggTables(aggTables,
+            copyListOfLoadFolders.asJava, copyListOfUpdatedLoadFolders.asJava, loadFolders))
+          if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
+            loadMetadataDetails.getLoadStatus)) {
+            // remove the current slice from memory not the table
+            CarbonLoaderUtil
+              .removeSliceFromMemory(model.getDatabaseName, model.getTableName, newSlice)
+            logInfo(s"Aggregate table creation failed")
+          } else {
+            logInfo("Aggregate tables creation successfull")
+          }
+        }
+        loadMetadataDetails.getLoadStatus
+      }
+
+      def loadTableSlices(listOfAllLoadFolders: java.util.List[String],
+          loadMetadataDetails: Array[LoadMetadataDetails]) = {
+        CarbonProperties.getInstance().addProperty("carbon.cache.used", "false")
+        // TODO: Implement it
+      }
+
+      def createManualAggregateTable: String = {
+        val details = model.getLoadMetadataDetails.asScala.toArray
+        val listOfAllLoadFolders = CarbonQueryUtil.getListOfSlices(details)
+        val listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details)
+        val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details)
+        loadTableSlices(listOfAllLoadFolders, details)
+        val loadFolders = Array[String]()
+        val aggTable = model.getAggTableName
+        loadMetadataDetails.setLoadStatus(loadAggregationTable(listOfLoadFolders,
+          listOfUpdatedLoadFolders, loadFolders))
+        if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
+          loadMetadataDetails.getLoadStatus)) {
+          logInfo(s"Aggregate table creation failed :: $aggTable")
+        } else {
+          logInfo(s"Aggregate table creation successfull :: $aggTable")
+        }
+        loadMetadataDetails.getLoadStatus
+      }
+
+      def recreateAggregationTableForRetention = {
+        val schema = model.getCarbonDataLoadSchema
+        val aggTables = schema.getCarbonTable.getAggregateTablesName
+        if (null != aggTables && !aggTables.isEmpty) {
+          val details = model.getLoadMetadataDetails.asScala.toArray
+          val listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details)
+          val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details)
+          val listOfAllLoadFolder = CarbonQueryUtil.getListOfSlices(details)
+          loadTableSlices(listOfAllLoadFolder, details)
+          val loadFolders = Array[String]()
+          iterateOverAggTables(aggTables, listOfLoadFolders, listOfUpdatedLoadFolders, loadFolders)
+        }
+      }
+
+      // TODO Aggregate table needs to be handled
+      def iterateOverAggTables(aggTables: java.util.List[String],
+          listOfLoadFolders: java.util.List[String],
+          listOfUpdatedLoadFolders: java.util.List[String],
+          loadFolders: Array[String]): String = {
+        model.setAggLoadRequest(true)
+        aggTables.asScala.foreach { aggTable =>
+          model.setAggTableName(aggTable)
+          loadMetadataDetails.setLoadStatus(loadAggregationTable(listOfLoadFolders,
+            listOfUpdatedLoadFolders, loadFolders))
+          if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
+            loadMetadataDetails.getLoadStatus)) {
+            logInfo(s"Aggregate table creation failed :: aggTable")
+            return loadMetadataDetails.getLoadStatus
+          }
+        }
+        loadMetadataDetails.getLoadStatus
+      }
+
+      def loadAggregationTable(listOfLoadFolders: java.util.List[String],
+          listOfUpdatedLoadFolders: java.util.List[String],
+          loadFolders: Array[String]): String = {
+        // TODO: Implement it
+        loadMetadataDetails.getLoadStatus
+      }
+
+      var finished = false
+
+      override def hasNext: Boolean = {
+        !finished
+      }
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+      }
+    }
+    iter
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    if (isTableSplitPartition) {
+      // for table split partition
+      val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
+      val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
+      location
+    } else {
+      // for node partition
+      val theSplit = split.asInstanceOf[CarbonNodePartition]
+      val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+      logInfo("Preferred Location for split: " + firstOptionLocation.head)
+      val blockMap = new util.LinkedHashMap[String, Integer]()
+      val tableBlocks = theSplit.blocksDetails
+      tableBlocks.foreach { tableBlock =>
+        tableBlock.getLocations.foreach { location =>
+          if (!firstOptionLocation.exists(location.equalsIgnoreCase)) {
+            val currentCount = blockMap.get(location)
+            if (currentCount == null) {
+              blockMap.put(location, 1)
+            } else {
+              blockMap.put(location, currentCount + 1)
+            }
+          }
+        }
+      }
+
+      val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
+        nodeCount1.getValue > nodeCount2.getValue
+      }
+      )
+
+      val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+      firstOptionLocation ++ sortedNodesList
+    }
+  }
+
+}
+
+/**
+ * Use this RDD class to load RDD
+ *
+ * @param sc
+ * @param result
+ * @param carbonLoadModel
+ * @param storePath
+ * @param kettleHomePath
+ * @param columinar
+ * @param loadCount
+ * @param tableCreationTime
+ * @param schemaLastUpdatedTime
+ * @param prev
+ * @tparam K
+ * @tparam V
+ */
+class DataFrameLoaderRDD[K, V](
+    sc: SparkContext,
+    result: DataLoadResult[K, V],
+    carbonLoadModel: CarbonLoadModel,
+    storePath: String,
+    kettleHomePath: String,
+    columinar: Boolean,
+    loadCount: Integer,
+    tableCreationTime: Long,
+    schemaLastUpdatedTime: Long,
+    prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) {
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  @DeveloperApi
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val resultIter = new Iterator[(K, V)] {
+      var partitionID = "0"
+      val loadMetadataDetails = new LoadMetadataDetails()
+      var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
+                               theSplit.index
+      try {
+        loadMetadataDetails.setPartitionCount(partitionID)
+        carbonLoadModel.setPartitionId(partitionID)
+        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+        carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+        val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, storePath,
+          kettleHomePath, loadCount, loadMetadataDetails)
+        loader.initialize
+        val rddIteratorKey = UUID.randomUUID().toString
+        try {
+          RddInputUtils.put(rddIteratorKey,
+              new PartitionIterator(
+                  firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context),
+                  carbonLoadModel,
+                  context))
+          carbonLoadModel.setRddIteratorKey(rddIteratorKey)
+          loader.run()
+        } finally {
+          RddInputUtils.remove(rddIteratorKey)
+        }
+      } catch {
+        case e: Exception =>
+          logInfo("DataLoad failure")
+          LOGGER.error(e)
+          throw e
+      }
+
+      var finished = false
+
+      override def hasNext: Boolean = !finished
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+      }
+    }
+    resultIter
+  }
+
+  override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+}
+
+class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]],
+    carbonLoadModel: CarbonLoadModel,
+    context: TaskContext) extends JavaRddIterator[JavaRddIterator[Array[String]]] {
+  def hasNext: Boolean = partitionIter.hasNext
+  def next: JavaRddIterator[Array[String]] = {
+    val value = partitionIter.next
+    new RddIterator(value.rdd.iterator(value.partition, context),
+        carbonLoadModel,
+        context)
+  }
+  def initialize: Unit = {
+    SparkUtil.setTaskContext(context)
+  }
+}
+/**
+ * This class wrap Scala's Iterator to Java's Iterator.
+ * It also convert all columns to string data to use csv data loading flow.
+ *
+ * @param rddIter
+ * @param carbonLoadModel
+ * @param context
+ */
+class RddIterator(rddIter: Iterator[Row],
+                  carbonLoadModel: CarbonLoadModel,
+                  context: TaskContext) extends JavaRddIterator[Array[String]] {
+
+  val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+    .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  val format = new SimpleDateFormat(formatString)
+  val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+  val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+  val serializationNullFormat =
+      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+  def hasNext: Boolean = rddIter.hasNext
+
+  def next: Array[String] = {
+    val row = rddIter.next()
+    val columns = new Array[String](row.length)
+    for (i <- 0 until columns.length) {
+      columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
+          delimiterLevel1, delimiterLevel2, format)
+    }
+    columns
+  }
+
+  def initialize: Unit = {
+    SparkUtil.setTaskContext(context)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
new file mode 100644
index 0000000..0534def
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.LoadMetadataDetails
+import org.apache.carbondata.spark.DeletedLoadResult
+import org.apache.carbondata.spark.load.DeletedLoadMetadata
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+class CarbonDeleteLoadByDateRDD[K, V](
+    sc: SparkContext,
+    result: DeletedLoadResult[K, V],
+    databaseName: String,
+    tableName: String,
+    dateField: String,
+    dateFieldActualName: String,
+    dateValue: String,
+    factTableName: String,
+    dimTableName: String,
+    storePath: String,
+    loadMetadataDetails: List[LoadMetadataDetails])
+  extends RDD[(K, V)](sc, Nil) {
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  override def getPartitions: Array[Partition] = {
+    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
+    splits.zipWithIndex.map {s =>
+      new CarbonLoadPartition(id, s._2, s._1)
+    }
+  }
+
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+    new Iterator[(K, V)] {
+      val deletedMetaData = new DeletedLoadMetadata()
+      val split = theSplit.asInstanceOf[CarbonLoadPartition]
+      logInfo("Input split: " + split.serializableHadoopSplit.value)
+
+      logInfo("Input split: " + split.serializableHadoopSplit.value)
+      val partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
+
+      // TODO call CARBON delete API
+      logInfo("Applying data retention as per date value " + dateValue)
+      var dateFormat = ""
+      try {
+        dateFormat = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
+      } catch {
+        case e: Exception => logInfo("Unable to parse with default time format " + dateValue)
+      }
+      // TODO: Implement it
+      var finished = false
+
+      override def hasNext: Boolean = {
+        finished
+      }
+
+      override def next(): (K, V) = {
+        result.getKey(null, null)
+      }
+    }
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val theSplit = split.asInstanceOf[CarbonLoadPartition]
+    val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
+    logInfo("Host Name: " + s.head + s.length)
+    s
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
new file mode 100644
index 0000000..26e1abc
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+
+import org.apache.carbondata.spark.Value
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+class CarbonDeleteLoadRDD[V: ClassTag](
+    sc: SparkContext,
+    valueClass: Value[V],
+    loadId: Int,
+    databaseName: String,
+    tableName: String,
+    partitioner: Partitioner)
+  extends RDD[V](sc, Nil) {
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  override def getPartitions: Array[Partition] = {
+    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
+    splits.zipWithIndex.map {f =>
+      new CarbonLoadPartition(id, f._2, f._1)
+    }
+  }
+
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
+    val iter = new Iterator[V] {
+      val split = theSplit.asInstanceOf[CarbonLoadPartition]
+      logInfo("Input split: " + split.serializableHadoopSplit.value)
+      // TODO call CARBON delete API
+
+      var havePair = false
+      var finished = false
+
+      override def hasNext: Boolean = {
+        if (!finished && !havePair) {
+          finished = true
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): V = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        valueClass.getValue(null)
+      }
+
+    }
+    logInfo("********Deleting***************")
+    iter
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val theSplit = split.asInstanceOf[CarbonLoadPartition]
+    val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
+    logInfo("Host Name: " + s.head + s.length)
+    s
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
new file mode 100644
index 0000000..dc63098
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+
+import org.apache.carbondata.spark.Value
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+class CarbonDropTableRDD[V: ClassTag](
+    sc: SparkContext,
+    valueClass: Value[V],
+    databaseName: String,
+    tableName: String)
+  extends RDD[V](sc, Nil) {
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  override def getPartitions: Array[Partition] = {
+    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
+    splits.zipWithIndex.map { s =>
+      new CarbonLoadPartition(id, s._2, s._1)
+    }
+  }
+
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
+
+    val iter = new Iterator[V] {
+      // TODO: Clear Btree from memory
+
+      var havePair = false
+      var finished = false
+
+      override def hasNext: Boolean = {
+        if (!finished && !havePair) {
+          finished = true
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): V = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        valueClass.getValue(null)
+      }
+    }
+    iter
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
new file mode 100644
index 0000000..3c15818
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.io.{DataInputStream, InputStreamReader}
+import java.nio.charset.Charset
+import java.text.SimpleDateFormat
+import java.util.regex.Pattern
+
+import scala.collection.mutable
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.util.control.Breaks.{break, breakable}
+
+import au.com.bytecode.opencsv.CSVReader
+import org.apache.commons.lang3.{ArrayUtils, StringUtils}
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+import org.apache.carbondata.spark.util.GlobalDictionaryUtil
+
+/**
+ * A partitioner partition by column.
+ *
+ * @constructor create a partitioner
+ * @param numParts the number of partitions
+ */
+class ColumnPartitioner(numParts: Int) extends Partitioner {
+  override def numPartitions: Int = numParts
+
+  override def getPartition(key: Any): Int = key.asInstanceOf[Int]
+}
+
+trait GenericParser {
+  val dimension: CarbonDimension
+
+  def addChild(child: GenericParser): Unit
+
+  def parseString(input: String): Unit
+}
+
+case class DictionaryStats(distinctValues: java.util.List[String],
+    dictWriteTime: Long, sortIndexWriteTime: Long)
+
+case class PrimitiveParser(dimension: CarbonDimension,
+    setOpt: Option[HashSet[String]]) extends GenericParser {
+  val (hasDictEncoding, set: HashSet[String]) = setOpt match {
+    case None => (false, new HashSet[String])
+    case Some(x) => (true, x)
+  }
+
+  def addChild(child: GenericParser): Unit = {
+  }
+
+  def parseString(input: String): Unit = {
+    if (hasDictEncoding && input != null) {
+      set.add(input)
+    }
+  }
+}
+
+case class ArrayParser(dimension: CarbonDimension, format: DataFormat) extends GenericParser {
+  var children: GenericParser = _
+
+  def addChild(child: GenericParser): Unit = {
+    children = child
+  }
+
+  def parseString(input: String): Unit = {
+    if (StringUtils.isNotEmpty(input)) {
+      val splits = format.getSplits(input)
+      if (ArrayUtils.isNotEmpty(splits)) {
+        splits.foreach { s =>
+          children.parseString(s)
+        }
+      }
+    }
+  }
+}
+
+case class StructParser(dimension: CarbonDimension,
+    format: DataFormat) extends GenericParser {
+  val children = new ArrayBuffer[GenericParser]
+
+  def addChild(child: GenericParser): Unit = {
+    children += child
+  }
+
+  def parseString(input: String): Unit = {
+    if (StringUtils.isNotEmpty(input)) {
+      val splits = format.getSplits(input)
+      val len = Math.min(children.length, splits.length)
+      for (i <- 0 until len) {
+        children(i).parseString(splits(i))
+      }
+    }
+  }
+}
+
+case class DataFormat(delimiters: Array[String],
+    var delimiterIndex: Int,
+    patterns: Array[Pattern]) extends Serializable {
+  self =>
+  def getSplits(input: String): Array[String] = {
+    // -1 in case after splitting the last column is empty, the surrogate key ahs to be generated
+    // for empty value too
+    patterns(delimiterIndex).split(input, -1)
+  }
+
+  def cloneAndIncreaseIndex: DataFormat = {
+    DataFormat(delimiters, Math.min(delimiterIndex + 1, delimiters.length - 1), patterns)
+  }
+}
+
+/**
+ * a case class to package some attributes
+ */
+case class DictionaryLoadModel(table: CarbonTableIdentifier,
+    dimensions: Array[CarbonDimension],
+    hdfsLocation: String,
+    dictfolderPath: String,
+    dictFilePaths: Array[String],
+    dictFileExists: Array[Boolean],
+    isComplexes: Array[Boolean],
+    primDimensions: Array[CarbonDimension],
+    delimiters: Array[String],
+    highCardIdentifyEnable: Boolean,
+    highCardThreshold: Int,
+    rowCountPercentage: Double,
+    columnIdentifier: Array[ColumnIdentifier],
+    isFirstLoad: Boolean,
+    hdfsTempLocation: String,
+    lockType: String,
+    zooKeeperUrl: String,
+    serializationNullFormat: String) extends Serializable
+
+case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends Serializable
+
+/**
+ * A RDD to combine all dictionary distinct values.
+ *
+ * @constructor create a RDD with RDD[(String, Iterable[String])]
+ * @param prev  the input RDD[(String, Iterable[String])]
+ * @param model a model package load info
+ */
+class CarbonAllDictionaryCombineRDD(
+    prev: RDD[(String, Iterable[String])],
+    model: DictionaryLoadModel)
+  extends RDD[(Int, ColumnDistinctValues)](prev) {
+
+  override def getPartitions: Array[Partition] = {
+    firstParent[(String, Iterable[String])].partitions
+  }
+
+  override def compute(split: Partition, context: TaskContext
+  ): Iterator[(Int, ColumnDistinctValues)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+    val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
+    /*
+     * for all dictionary, all columns need to encoding and checking
+     * isHighCardinalityColumn, so no need to calculate rowcount
+     */
+    val rowCount = 0L
+    try {
+      val dimensionParsers =
+        GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList)
+      val dimNum = model.dimensions.length
+      // Map[dimColName -> dimColNameIndex]
+      val columnIndexMap = new HashMap[String, Int]()
+      for (j <- 0 until dimNum) {
+        columnIndexMap.put(model.dimensions(j).getColName, j)
+      }
+
+      var row: (String, Iterable[String]) = null
+      val rddIter = firstParent[(String, Iterable[String])].iterator(split, context)
+      // generate block distinct value set
+      while (rddIter.hasNext) {
+        row = rddIter.next()
+        if (row != null) {
+          columnIndexMap.get(row._1) match {
+            case Some(index) =>
+              for (record <- row._2) {
+                dimensionParsers(index).parseString(record)
+              }
+            case None =>
+          }
+        }
+      }
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex)
+        throw ex
+    }
+
+    distinctValuesList.map { iter =>
+      val valueList = iter._2.toArray
+      (iter._1, ColumnDistinctValues(valueList, rowCount))
+    }.iterator
+  }
+}
+
+/**
+ * A RDD to combine distinct values in block.
+ *
+ * @constructor create a RDD with RDD[Row]
+ * @param prev  the input RDD[Row]
+ * @param model a model package load info
+ */
+class CarbonBlockDistinctValuesCombineRDD(
+    prev: RDD[Row],
+    model: DictionaryLoadModel)
+  extends RDD[(Int, ColumnDistinctValues)](prev) {
+
+  override def getPartitions: Array[Partition] = firstParent[Row].partitions
+
+  override def compute(split: Partition,
+      context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
+    val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
+    var rowCount = 0L
+    try {
+      val dimensionParsers =
+        GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList)
+      val dimNum = model.dimensions.length
+      var row: Row = null
+      val rddIter = firstParent[Row].iterator(split, context)
+      val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+          .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+      val format = new SimpleDateFormat(formatString)
+      // generate block distinct value set
+      while (rddIter.hasNext) {
+        row = rddIter.next()
+        if (row != null) {
+          rowCount += 1
+          for (i <- 0 until dimNum) {
+            dimensionParsers(i).parseString(CarbonScalaUtil.getString(row.get(i),
+                model.serializationNullFormat, model.delimiters(0), model.delimiters(1), format))
+          }
+        }
+      }
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex)
+        throw ex
+    }
+
+    distinctValuesList.map { iter =>
+      val valueList = iter._2.toArray
+      (iter._1, ColumnDistinctValues(valueList, rowCount))
+    }.iterator
+  }
+}
+
+/**
+ * A RDD to generate dictionary file for each column
+ *
+ * @constructor create a RDD with RDD[Row]
+ * @param prev  the input RDD[Row]
+ * @param model a model package load info
+ */
+class CarbonGlobalDictionaryGenerateRDD(
+    prev: RDD[(Int, ColumnDistinctValues)],
+    model: DictionaryLoadModel)
+  extends RDD[(Int, String, Boolean)](prev) {
+
+  override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
+
+  override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+    var isHighCardinalityColumn = false
+    val iter = new Iterator[(Int, String, Boolean)] {
+      var dictionaryForDistinctValueLookUp:
+      org.apache.carbondata.core.cache.dictionary.Dictionary = _
+      var dictionaryForSortIndexWriting: org.apache.carbondata.core.cache.dictionary.Dictionary = _
+      var dictionaryForDistinctValueLookUpCleared: Boolean = false
+      val pathService = CarbonCommonFactory.getPathService
+      val carbonTablePath = pathService.getCarbonTablePath(model.hdfsLocation, model.table)
+      if (StringUtils.isNotBlank(model.hdfsTempLocation )) {
+         CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
+           model.hdfsTempLocation)
+      }
+      if (StringUtils.isNotBlank(model.lockType)) {
+        CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
+          model.lockType)
+      }
+      if (StringUtils.isNotBlank(model.zooKeeperUrl)) {
+        CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
+          model.zooKeeperUrl)
+      }
+      val dictLock = CarbonLockFactory
+        .getCarbonLockObj(carbonTablePath.getRelativeDictionaryDirectory,
+          model.columnIdentifier(split.index).getColumnId + LockUsage.LOCK)
+      var isDictionaryLocked = false
+      // generate distinct value list
+      try {
+        val t1 = System.currentTimeMillis
+        val valuesBuffer = new mutable.HashSet[String]
+        val rddIter = firstParent[(Int, ColumnDistinctValues)].iterator(split, context)
+        var rowCount = 0L
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
+        breakable {
+          while (rddIter.hasNext) {
+            val distinctValueList = rddIter.next()._2
+            valuesBuffer ++= distinctValueList.values
+            rowCount += distinctValueList.rowCount
+            // check high cardinality
+            if (model.isFirstLoad && model.highCardIdentifyEnable
+                && !model.isComplexes(split.index)
+                && model.dimensions(split.index).isColumnar) {
+              isHighCardinalityColumn = GlobalDictionaryUtil.isHighCardinalityColumn(
+                valuesBuffer.size, rowCount, model)
+              if (isHighCardinalityColumn) {
+                break
+              }
+            }
+          }
+        }
+        val combineListTime = System.currentTimeMillis() - t1
+        if (isHighCardinalityColumn) {
+          LOGGER.info(s"column ${ model.table.getTableUniqueName }." +
+                      s"${
+                        model.primDimensions(split.index)
+                          .getColName
+                      } is high cardinality column")
+        } else {
+          isDictionaryLocked = dictLock.lockWithRetries()
+          if (isDictionaryLocked) {
+            logInfo(s"Successfully able to get the dictionary lock for ${
+              model.primDimensions(split.index).getColName
+            }")
+          } else {
+            sys
+              .error(s"Dictionary file ${
+                model.primDimensions(split.index).getColName
+              } is locked for updation. Please try after some time")
+          }
+          val t2 = System.currentTimeMillis
+          val fileType = FileFactory.getFileType(model.dictFilePaths(split.index))
+          model.dictFileExists(split.index) = FileFactory
+            .isFileExist(model.dictFilePaths(split.index), fileType)
+          dictionaryForDistinctValueLookUp = if (model.dictFileExists(split.index)) {
+            CarbonLoaderUtil.getDictionary(model.table,
+              model.columnIdentifier(split.index),
+              model.hdfsLocation,
+              model.primDimensions(split.index).getDataType
+            )
+          } else {
+            null
+          }
+          val dictCacheTime = System.currentTimeMillis - t2
+          val t3 = System.currentTimeMillis()
+          val dictWriteTask = new DictionaryWriterTask(valuesBuffer,
+            dictionaryForDistinctValueLookUp,
+            model,
+            split.index)
+          // execute dictionary writer task to get distinct values
+          val distinctValues = dictWriteTask.execute()
+          val dictWriteTime = System.currentTimeMillis() - t3
+          val t4 = System.currentTimeMillis()
+          // if new data came than rewrite sort index file
+          if (distinctValues.size() > 0) {
+            val sortIndexWriteTask = new SortIndexWriterTask(model,
+              split.index,
+              dictionaryForDistinctValueLookUp,
+              distinctValues)
+            sortIndexWriteTask.execute()
+          }
+          val sortIndexWriteTime = System.currentTimeMillis() - t4
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
+          // After sortIndex writing, update dictionaryMeta
+          dictWriteTask.updateMetaData()
+          // clear the value buffer after writing dictionary data
+          valuesBuffer.clear
+          CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
+          dictionaryForDistinctValueLookUpCleared = true
+          LOGGER.info(s"\n columnName: ${ model.primDimensions(split.index).getColName }" +
+                      s"\n columnId: ${ model.primDimensions(split.index).getColumnId }" +
+                      s"\n new distinct values count: ${ distinctValues.size() }" +
+                      s"\n combine lists: $combineListTime" +
+                      s"\n create dictionary cache: $dictCacheTime" +
+                      s"\n sort list, distinct and write: $dictWriteTime" +
+                      s"\n write sort info: $sortIndexWriteTime")
+        }
+      } catch {
+        case ex: Exception =>
+          LOGGER.error(ex)
+          throw ex
+      } finally {
+        if (!dictionaryForDistinctValueLookUpCleared) {
+          CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
+        }
+        CarbonUtil.clearDictionaryCache(dictionaryForSortIndexWriting)
+        if (dictLock != null && isDictionaryLocked) {
+          if (dictLock.unlock()) {
+            logInfo(s"Dictionary ${
+              model.primDimensions(split.index).getColName
+            } Unlocked Successfully.")
+          } else {
+            logError(s"Unable to unlock Dictionary ${
+              model.primDimensions(split.index).getColName
+            }")
+          }
+        }
+      }
+      var finished = false
+
+      override def hasNext: Boolean = {
+
+        if (!finished) {
+          finished = true
+          finished
+        } else {
+          !finished
+        }
+      }
+
+      override def next(): (Int, String, Boolean) = {
+        (split.index, status, isHighCardinalityColumn)
+      }
+    }
+
+    iter
+  }
+
+}
+
+/**
+ * Set column dictionry patition format
+ *
+ * @param id        partition id
+ * @param dimension current carbon dimension
+ */
+class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
+  extends Partition {
+  override val index: Int = id
+  val preDefDictDimension = dimension
+}
+
+
+/**
+ * Use external column dict to generate global dictionary
+ *
+ * @param carbonLoadModel carbon load model
+ * @param sparkContext    spark context
+ * @param table           carbon table identifier
+ * @param dimensions      carbon dimenisons having predefined dict
+ * @param hdfsLocation    carbon base store path
+ * @param dictFolderPath  path of dictionary folder
+ */
+class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
+    dictionaryLoadModel: DictionaryLoadModel,
+    sparkContext: SparkContext,
+    table: CarbonTableIdentifier,
+    dimensions: Array[CarbonDimension],
+    hdfsLocation: String,
+    dictFolderPath: String)
+  extends RDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
+
+  override def getPartitions: Array[Partition] = {
+    val primDimensions = dictionaryLoadModel.primDimensions
+    val primDimLength = primDimensions.length
+    val result = new Array[Partition](primDimLength)
+    for (i <- 0 until primDimLength) {
+      result(i) = new CarbonColumnDictPatition(i, primDimensions(i))
+    }
+    result
+  }
+
+  override def compute(split: Partition, context: TaskContext)
+  : Iterator[(Int, ColumnDistinctValues)] = {
+    val theSplit = split.asInstanceOf[CarbonColumnDictPatition]
+    val primDimension = theSplit.preDefDictDimension
+    // read the column dict data
+    val preDefDictFilePath = carbonLoadModel.getPredefDictFilePath(primDimension)
+    var csvReader: CSVReader = null
+    var inputStream: DataInputStream = null
+    var colDictData: java.util.Iterator[Array[String]] = null
+    try {
+      inputStream = FileFactory.getDataInputStream(preDefDictFilePath,
+        FileFactory.getFileType(preDefDictFilePath))
+      csvReader = new CSVReader(new InputStreamReader(inputStream, Charset.defaultCharset),
+        carbonLoadModel.getCsvDelimiter.charAt(0))
+      // read the column data to list iterator
+      colDictData = csvReader.readAll.iterator
+    } catch {
+      case ex: Exception =>
+        logError(s"Error in reading pre-defined " +
+                 s"dictionary file:${ ex.getMessage }")
+        throw ex
+    } finally {
+      if (csvReader != null) {
+        try {
+          csvReader.close()
+        } catch {
+          case ex: Exception =>
+            logError(s"Error in closing csvReader of " +
+                     s"pre-defined dictionary file:${ ex.getMessage }")
+        }
+      }
+      if (inputStream != null) {
+        try {
+          inputStream.close()
+        } catch {
+          case ex: Exception =>
+            logError(s"Error in closing inputStream of " +
+                     s"pre-defined dictionary file:${ ex.getMessage }")
+        }
+      }
+    }
+    val mapIdWithSet = new HashMap[String, HashSet[String]]
+    val columnValues = new HashSet[String]
+    val distinctValues = (theSplit.index, columnValues)
+    mapIdWithSet.put(primDimension.getColumnId, columnValues)
+    // use parser to generate new dict value
+    val dimensionParser = GlobalDictionaryUtil.generateParserForDimension(
+      Some(primDimension),
+      GlobalDictionaryUtil.createDataFormat(carbonLoadModel.getDelimiters),
+      mapIdWithSet).get
+    // parse the column data
+    while (colDictData.hasNext) {
+      dimensionParser.parseString(colDictData.next()(0))
+    }
+    Array((distinctValues._1,
+      ColumnDistinctValues(distinctValues._2.toArray, 0L))).iterator
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
new file mode 100644
index 0000000..c9e3b6b
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.util
+import java.util.{Collections, List}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
+import org.apache.spark.sql.hive.DistributionUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo}
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException}
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.scan.result.iterator.RawResultIterator
+import org.apache.carbondata.spark.MergeResult
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger}
+import org.apache.carbondata.spark.splits.TableSplit
+
+class CarbonMergerRDD[K, V](
+    sc: SparkContext,
+    result: MergeResult[K, V],
+    carbonLoadModel: CarbonLoadModel,
+    carbonMergerMapping: CarbonMergerMapping,
+    confExecutorsTemp: String)
+  extends RDD[(K, V)](sc, Nil) {
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+  sc.setLocalProperty("spark.job.interruptOnCancel", "true")
+
+  var storeLocation: String = null
+  val storePath = carbonMergerMapping.storePath
+  val metadataFilePath = carbonMergerMapping.metadataFilePath
+  val mergedLoadName = carbonMergerMapping.mergedLoadName
+  val databaseName = carbonMergerMapping.databaseName
+  val factTableName = carbonMergerMapping.factTableName
+  val tableId = carbonMergerMapping.tableId
+
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val iter = new Iterator[(K, V)] {
+
+      carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+      val tempLocationKey: String = CarbonCommonConstants
+                                      .COMPACTION_KEY_WORD + '_' + carbonLoadModel
+                                      .getDatabaseName + '_' + carbonLoadModel
+                                      .getTableName + '_' + carbonLoadModel.getTaskNo
+
+      // this property is used to determine whether temp location for carbon is inside
+      // container temp dir or is yarn application directory.
+      val carbonUseLocalDir = CarbonProperties.getInstance()
+        .getProperty("carbon.use.local.dir", "false")
+
+      if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+
+        val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+        if (null != storeLocations && storeLocations.nonEmpty) {
+          storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+        }
+        if (storeLocation == null) {
+          storeLocation = System.getProperty("java.io.tmpdir")
+        }
+      } else {
+        storeLocation = System.getProperty("java.io.tmpdir")
+      }
+      storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
+      CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+      LOGGER.info(s"Temp storeLocation taken is $storeLocation")
+      var mergeStatus = false
+      var mergeNumber = ""
+      var exec: CarbonCompactionExecutor = null
+      try {
+        val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
+
+        // get destination segment properties as sent from driver which is of last segment.
+
+        val segmentProperties = new SegmentProperties(
+          carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
+          carbonMergerMapping.maxSegmentColCardinality)
+
+        // sorting the table block info List.
+        val splitList = carbonSparkPartition.split.value.getAllSplits
+        val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
+
+        Collections.sort(tableBlockInfoList)
+
+        val segmentMapping: java.util.Map[String, TaskBlockInfo] =
+          CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
+
+        val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
+          CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
+
+        carbonLoadModel.setStorePath(storePath)
+
+        exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
+          factTableName, storePath, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+          dataFileMetadataSegMapping
+        )
+
+        // fire a query and get the results.
+        var result2: util.List[RawResultIterator] = null
+        try {
+          result2 = exec.processTableBlocks()
+        } catch {
+          case e: Throwable =>
+            if (null != exec) {
+              exec.finish()
+            }
+            LOGGER.error(e)
+            if (null != e.getMessage) {
+              sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
+            } else {
+              sys.error("Exception occurred in query execution.Please check logs.")
+            }
+        }
+        mergeNumber = mergedLoadName
+          .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
+                     CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
+          )
+
+        val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
+          factTableName,
+          carbonLoadModel.getTaskNo,
+          "0",
+          mergeNumber,
+          true
+        )
+
+        carbonLoadModel.setSegmentId(mergeNumber)
+        carbonLoadModel.setPartitionId("0")
+        val merger =
+          new RowResultMerger(result2,
+            databaseName,
+            factTableName,
+            segmentProperties,
+            tempStoreLoc,
+            carbonLoadModel,
+            carbonMergerMapping.maxSegmentColCardinality
+          )
+        mergeStatus = merger.mergerSlice()
+
+      } catch {
+        case e: Exception =>
+          LOGGER.error(e)
+          throw e
+      } finally {
+        // delete temp location data
+        val newSlice = CarbonCommonConstants.LOAD_FOLDER + mergeNumber
+        try {
+          val isCompactionFlow = true
+          CarbonLoaderUtil
+            .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow)
+        } catch {
+          case e: Exception =>
+            LOGGER.error(e)
+        }
+        if (null != exec) {
+          exec.finish
+        }
+      }
+
+      var finished = false
+
+      override def hasNext: Boolean = {
+        if (!finished) {
+          finished = true
+          finished
+        } else {
+          !finished
+        }
+      }
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey(0, mergeStatus)
+      }
+
+    }
+    iter
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val theSplit = split.asInstanceOf[CarbonSparkPartition]
+    theSplit.split.value.getLocations.filter(_ != "localhost")
+  }
+
+  override def getPartitions: Array[Partition] = {
+    val startTime = System.currentTimeMillis()
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
+      storePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
+    )
+    val jobConf: JobConf = new JobConf(new Configuration)
+    val job: Job = new Job(jobConf)
+    val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+    var defaultParallelism = sparkContext.defaultParallelism
+    val result = new util.ArrayList[Partition](defaultParallelism)
+
+    // mapping of the node and block list.
+    var nodeBlockMapping: util.Map[String, util.List[Distributable]] = new
+        util.HashMap[String, util.List[Distributable]]
+
+    val noOfBlocks = 0
+    var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
+
+    // for each valid segment.
+    for (eachSeg <- carbonMergerMapping.validSegments) {
+
+      // map for keeping the relation of a task and its blocks.
+      job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
+
+      // get splits
+      val splits = format.getSplits(job)
+      carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+    }
+
+    // prepare the details required to extract the segment properties using last segment.
+    if (null != carbonInputSplits && carbonInputSplits.nonEmpty) {
+      val carbonInputSplit = carbonInputSplits.last
+      var dataFileFooter: DataFileFooter = null
+
+      try {
+        dataFileFooter = CarbonUtil.readMetadatFile(carbonInputSplit.getPath.toString(),
+          carbonInputSplit.getStart, carbonInputSplit.getLength)
+      } catch {
+        case e: CarbonUtilException =>
+          logError("Exception in preparing the data file footer for compaction " + e.getMessage)
+          throw e
+      }
+
+      carbonMergerMapping.maxSegmentColCardinality = dataFileFooter.getSegmentInfo
+        .getColumnCardinality
+      carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
+        .toList
+    }
+    // send complete list of blocks to the mapping util.
+    nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(
+      carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava, -1)
+
+    val confExecutors = confExecutorsTemp.toInt
+    val requiredExecutors = if (nodeBlockMapping.size > confExecutors) {
+      confExecutors
+    } else { nodeBlockMapping.size() }
+    DistributionUtil.ensureExecutors(sparkContext, requiredExecutors)
+    logInfo("No.of Executors required=" + requiredExecutors +
+            " , spark.executor.instances=" + confExecutors +
+            ", no.of.nodes where data present=" + nodeBlockMapping.size())
+    var nodes = DistributionUtil.getNodeList(sparkContext)
+    var maxTimes = 30
+    while (nodes.length < requiredExecutors && maxTimes > 0) {
+      Thread.sleep(500)
+      nodes = DistributionUtil.getNodeList(sparkContext)
+      maxTimes = maxTimes - 1
+    }
+    logInfo("Time taken to wait for executor allocation is =" + ((30 - maxTimes) * 500) + "millis")
+    defaultParallelism = sparkContext.defaultParallelism
+    var i = 0
+
+    val nodeTaskBlocksMap = new util.HashMap[String, util.List[NodeInfo]]()
+
+    // Create Spark Partition for each task and assign blocks
+    nodeBlockMapping.asScala.foreach { case (nodeName, blockList) =>
+      val taskBlockList = new util.ArrayList[NodeInfo](0)
+      nodeTaskBlocksMap.put(nodeName, taskBlockList)
+      var blockletCount = 0
+      blockList.asScala.foreach { taskInfo =>
+        val blocksPerNode = taskInfo.asInstanceOf[CarbonInputSplit]
+        blockletCount = blockletCount + blocksPerNode.getNumberOfBlocklets
+        taskBlockList.add(
+          NodeInfo(blocksPerNode.taskId, blocksPerNode.getNumberOfBlocklets))
+      }
+      if (blockletCount != 0) {
+        val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier,
+          carbonInputSplits.asJava, nodeName)
+        result.add(new CarbonSparkPartition(id, i, multiBlockSplit))
+        i += 1
+      }
+    }
+
+    // print the node info along with task and number of blocks for the task.
+
+    nodeTaskBlocksMap.asScala.foreach((entry: (String, List[NodeInfo])) => {
+      logInfo(s"for the node ${ entry._1 }")
+      for (elem <- entry._2.asScala) {
+        logInfo("Task ID is " + elem.TaskId + "no. of blocks is " + elem.noOfBlocks)
+      }
+    })
+
+    val noOfNodes = nodes.length
+    val noOfTasks = result.size
+    logInfo(s"Identified  no.of.Blocks: $noOfBlocks," +
+            s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks")
+    logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
+    for (j <- 0 until result.size ) {
+      val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
+      val splitList = multiBlockSplit.getAllSplits
+      logInfo(s"Node: ${multiBlockSplit.getLocations.mkString(",")}, No.Of Blocks: " +
+              s"${CarbonInputSplit.createBlocks(splitList).size}")
+    }
+    result.toArray(new Array[Partition](result.size))
+  }
+
+}
+
+class CarbonLoadPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit)
+  extends Partition {
+
+  override val index: Int = idx
+  val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit)
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
new file mode 100644
index 0000000..82a471f
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import org.apache.spark.{Partition, SerializableWritable}
+
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
+
+class CarbonSparkPartition(
+    val rddId: Int,
+    val idx: Int,
+    @transient val multiBlockSplit: CarbonMultiBlockSplit)
+    extends Partition {
+
+  val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)
+
+  override val index: Int = idx
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
new file mode 100644
index 0000000..fe805fe
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.spark.MergeResultImpl
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
+
+/**
+ * Compactor class which handled the compaction cases.
+ */
+object Compactor {
+
+  val logger = LogServiceFactory.getLogService(Compactor.getClass.getName)
+
+  def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
+
+    val storePath = compactionCallableModel.storePath
+    val storeLocation = compactionCallableModel.storeLocation
+    val carbonTable = compactionCallableModel.carbonTable
+    val kettleHomePath = compactionCallableModel.kettleHomePath
+    val cubeCreationTime = compactionCallableModel.cubeCreationTime
+    val loadsToMerge = compactionCallableModel.loadsToMerge
+    val sc = compactionCallableModel.sqlContext
+    val carbonLoadModel = compactionCallableModel.carbonLoadModel
+    val compactionType = compactionCallableModel.compactionType
+
+    val startTime = System.nanoTime()
+    val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
+    var finalMergeStatus = false
+    val schemaName: String = carbonLoadModel.getDatabaseName
+    val factTableName = carbonLoadModel.getTableName
+    val validSegments: Array[String] = CarbonDataMergerUtil
+      .getValidSegments(loadsToMerge).split(',')
+    val mergeLoadStartTime = CarbonLoaderUtil.readCurrentTime()
+    val carbonMergerMapping = CarbonMergerMapping(storeLocation,
+      storePath,
+      carbonTable.getMetaDataFilepath,
+      mergedLoadName,
+      kettleHomePath,
+      cubeCreationTime,
+      schemaName,
+      factTableName,
+      validSegments,
+      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+      maxSegmentColCardinality = null,
+      maxSegmentColumnSchemaList = null
+    )
+    carbonLoadModel.setStorePath(carbonMergerMapping.storePath)
+    carbonLoadModel.setLoadMetadataDetails(
+      SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
+    var execInstance = "1"
+    // in case of non dynamic executor allocation, number of executors are fixed.
+    if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
+      execInstance = sc.sparkContext.getConf.get("spark.executor.instances")
+      logger.info(s"spark.executor.instances property is set to = $execInstance")
+    } // in case of dynamic executor allocation, taking the max executors of the dynamic allocation.
+    else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) {
+      if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
+        .equalsIgnoreCase("true")) {
+        execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+        logger.info(s"spark.dynamicAllocation.maxExecutors property is set to = $execInstance")
+      }
+    }
+
+    val mergeStatus = new CarbonMergerRDD(
+      sc.sparkContext,
+      new MergeResultImpl(),
+      carbonLoadModel,
+      carbonMergerMapping,
+      execInstance
+    ).collect
+
+    if (mergeStatus.length == 0) {
+      finalMergeStatus = false
+    } else {
+      finalMergeStatus = mergeStatus.forall(_._2)
+    }
+
+    if (finalMergeStatus) {
+      val endTime = System.nanoTime()
+      logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
+      if (!CarbonDataMergerUtil
+        .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
+          mergedLoadName, carbonLoadModel, mergeLoadStartTime, compactionType
+        )) {
+        logger.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+                     s"${ carbonLoadModel.getTableName }")
+        logger.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+                     s"${ carbonLoadModel.getTableName }")
+        throw new Exception(s"Compaction failed to update metadata for table" +
+                            s" ${ carbonLoadModel.getDatabaseName }." +
+                            s"${ carbonLoadModel.getTableName }")
+      } else {
+        logger.audit(s"Compaction request completed for table " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        logger.info("Compaction request completed for table ${ carbonLoadModel.getDatabaseName } " +
+                    s".${ carbonLoadModel.getTableName }")
+      }
+    } else {
+      logger.audit("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " +
+                   s".${ carbonLoadModel.getTableName }"
+      )
+      logger.error("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " +
+                   s".${ carbonLoadModel.getTableName }")
+      throw new Exception("Compaction Failure in Merger Rdd.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
new file mode 100644
index 0000000..7395e43
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+
+case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
+
+class DataLoadCoalescedRDD[T: ClassTag](
+  @transient var prev: RDD[T],
+  nodeList: Array[String])
+    extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) {
+
+  override def getPartitions: Array[Partition] = {
+    new DataLoadPartitionCoalescer(prev, nodeList).run
+  }
+
+  override def compute(split: Partition,
+      context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
+
+    new Iterator[DataLoadPartitionWrap[T]] {
+      val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator
+      def hasNext = iter.hasNext
+      def next: DataLoadPartitionWrap[T] = {
+        DataLoadPartitionWrap(firstParent[T], iter.next())
+      }
+    }
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = {
+    Seq(new NarrowDependency(prev) {
+      def getParents(id: Int): Seq[Int] =
+        partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
+    })
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    prev = null
+  }
+
+  /**
+   * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
+   * then the preferred machine will be one which most parent splits prefer too.
+   * @param partition
+   * @return the machine most preferred by split
+   */
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+    partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
+  }
+}